# 80_UTILS_CANE_SUPPLY.R # ============================================================================ # CANE SUPPLY CLIENT-SPECIFIC UTILITIES (SCRIPT 80 - CLIENT TYPE: cane_supply) # # Contains ANGATA and other cane supply-specific KPI and reporting functions. # # Currently, CANE_SUPPLY clients use the common utilities from 80_utils_common.R: # - Weekly statistics (calculate_field_statistics, calculate_kpi_trends) # - Field analysis reporting (generate_field_analysis_summary) # - Excel export (export_field_analysis_excel) # # This file is structured to accommodate future ANGATA-specific functionality such as: # - Custom yield models # - Harvest readiness criteria # - Supply chain integration hooks # - ANGATA-specific alerting and messaging # # Orchestrator: (Placeholder - uses common functions) # Dependencies: 00_common_utils.R, 80_utils_common.R # Used by: 80_calculate_kpis.R (when client_type == "cane_supply") # ============================================================================ library(terra) library(sf) library(dplyr) library(tidyr) library(readxl) library(writexl) # ============================================================================ # ALERT THRESHOLDS & CONFIGURATION CONSTANTS # ============================================================================ # CI change thresholds for alert categorization and status determination # These values are project-standard and should be consistent across all workflows CI_CHANGE_RAPID_GROWTH_THRESHOLD <- 0.5 # Weekly CI change for positive growth alert CI_CHANGE_POSITIVE_GROWTH_THRESHOLD <- 0.2 # Weekly CI change for acceptable growth CI_CHANGE_STABLE_THRESHOLD <- -0.2 # Weekly CI change for stable status (between -0.2 and +0.2) CI_CHANGE_STRESS_TREND_THRESHOLD <- -0.3 # 4-week trend threshold for stress detection CI_CHANGE_RAPID_DECLINE_THRESHOLD <- -0.5 # Weekly CI change for rapid decline alert # Deprecated aliases (for backward compatibility if needed): CI_CHANGE_DECLINE_THRESHOLD <- CI_CHANGE_RAPID_DECLINE_THRESHOLD # Weekly CI change threshold for decline alerts CI_CHANGE_INCREASE_THRESHOLD <- CI_CHANGE_RAPID_GROWTH_THRESHOLD # Weekly CI change threshold for increase alerts # ============================================================================ # ANGATA-SPECIFIC HELPER FUNCTIONS (Placeholder Section) # ============================================================================ #' Calculate acreage for each field from geometry #' @param field_boundaries_sf sf object with field geometries #' @return data.frame with field and acreage columns calculate_field_acreages <- function(field_boundaries_sf) { tryCatch({ # Project to equal-area CRS (EPSG:6933) for accurate area calculations field_boundaries_proj <- sf::st_transform(field_boundaries_sf, "EPSG:6933") lookup_df <- field_boundaries_proj %>% sf::st_drop_geometry() %>% as.data.frame() %>% mutate( geometry_valid = sapply(seq_len(nrow(field_boundaries_proj)), function(idx) { tryCatch({ sf::st_is_valid(field_boundaries_proj[idx, ]) }, error = function(e) FALSE) }), area_ha = 0 ) # Calculate area for valid geometries valid_indices <- which(lookup_df$geometry_valid) areas_ha <- vapply(valid_indices, function(idx) { tryCatch({ area_m2 <- as.numeric(sf::st_area(field_boundaries_proj[idx, ])) area_m2 / 10000 }, error = function(e) NA_real_) }, numeric(1)) lookup_df$area_ha[valid_indices] <- areas_ha # Convert hectares to acres lookup_df %>% mutate(acreage = area_ha / 0.404686) %>% # Aggregate by field to handle multi-row fields (e.g., sub_fields) group_by(field) %>% summarise(acreage = sum(acreage, na.rm = TRUE), .groups = "drop") %>% select(field, acreage) }, error = function(e) { message(paste("Warning: Could not calculate acreages from geometries -", e$message)) data.frame(field = character(0), acreage = numeric(0)) }) } #' Calculate age in weeks from planting date #' #' @param planting_date Date of planting #' @param reference_date Date to calculate age relative to (typically end_date) #' @return Numeric age in weeks (rounded to nearest week) calculate_age_week <- function(planting_date, reference_date) { if (is.na(planting_date)) { return(NA_real_) } round(as.numeric(difftime(reference_date, planting_date, units = "weeks")), 0) } #' Categorize regression slope into field uniformity interpretation #' #' **Input**: Numeric slope from `calculate_regression_slope()` applied to CV values #' **Output**: 3-category labels for field uniformity trend #' #' **Categories**: #' - "More uniform" (slope < -0.01): Field becoming more homogeneous #' - "Stable uniformity" (-0.01 ≤ slope ≤ 0.01): Field uniformity stable #' - "Less uniform" (slope > 0.01): Field becoming more patchy/heterogeneous #' #' @param slope Numeric slope from `calculate_regression_slope(cv_values)` #' @return Character category or NA #' categorize_cv_trend_long_term <- function(slope) { if (is.na(slope)) { return(NA_character_) } else if (slope < -0.01) { return("More uniform") } else if (slope > 0.01) { return("Less uniform") } else { return("Stable uniformity") } } #' Determine status alert for CANE_SUPPLY client (harvest/milling workflow) #' #' Alerts focus on: harvest readiness, crop health monitoring, exception detection #' Uses WEEKLY trends (Four_week_trend) not daily noise for consistency #' Designed for harvest/milling clients who manage expectation, not daily operations #' #' Priority order: #' 1. harvest_ready → Schedule harvest operations #' 2. harvested_bare → Record completion / detect unexpected harvest #' 3. stress_detected → Monitor crop health (consistent decline) #' 4. germination_delayed → Early warning for young fields #' 5. growth_on_track → Positive signal (no action needed) #' 6. NA → Normal growth (no alert) #' #' @param imminent_prob Numeric harvest probability (0-1) #' @param age_week Numeric age in weeks since planting/harvest #' @param mean_ci Numeric mean Chlorophyll Index #' @param four_week_trend Numeric 4-week trend in CI (slope of growth) #' @param weekly_ci_change Numeric week-over-week CI change #' @param cv Numeric coefficient of variation (field uniformity) #' @return Character status alert code or NA calculate_status_alert <- function(imminent_prob, age_week, mean_ci, four_week_trend, weekly_ci_change, cv) { # Priority 1: HARVEST READY - highest business priority # Field is mature (≥12 months) AND harvest model predicts imminent harvest if (!is.na(imminent_prob) && imminent_prob > 0.5 && !is.na(age_week) && age_week >= 52) { return("harvest_ready") } # Priority 2: HARVESTED/BARE - indicator of completion (or unexpected harvest) # Mean CI dropped below vegetative threshold if (!is.na(mean_ci) && mean_ci < 1.5) { return("harvested_bare") } # Priority 3: STRESS DETECTED - consistent health decline (weekly trend) # Uses Four_week_trend (smooth trend) NOT daily fluctuation to avoid noise # Crop declining but not yet bare → opportunity to investigate # References: CI_CHANGE_STRESS_TREND_THRESHOLD for 4-week trend detection if (!is.na(four_week_trend) && four_week_trend < CI_CHANGE_STRESS_TREND_THRESHOLD && !is.na(mean_ci) && mean_ci > 1.5) { return("stress_detected") } # Priority 4: GERMINATION DELAYED - early warning for young fields # Age 4-8 weeks is typical germination window; low CI = slow start if (!is.na(age_week) && age_week >= 4 && age_week <= 8 && !is.na(mean_ci) && mean_ci < 1.5) { return("germination_delayed") } # Priority 5: GROWTH ON TRACK - positive operational status # Field is healthy, uniform, and growing steadily (no action needed) # Conditions: good uniformity (CV < 0.15) AND stable/positive weekly trend # References: CI_CHANGE_STABLE_THRESHOLD (±0.2 = stable, no change) if (!is.na(cv) && cv < 0.15 && !is.na(four_week_trend) && four_week_trend >= CI_CHANGE_STABLE_THRESHOLD && !is.na(weekly_ci_change) && weekly_ci_change >= CI_CHANGE_STABLE_THRESHOLD) { return("growth_on_track") } # Default: NORMAL GROWTH (no specific alert) # Field is growing but may have minor variability; continues normal monitoring NA_character_ } #' Build complete per-field KPI dataframe with all 22 columns #' @param current_stats data.frame with current week statistics from load_or_calculate_weekly_stats #' @param planting_dates data.frame with field_id and planting_date columns #' @param imminent_prob_data data.frame with Field_id and Imminent_prob_actual columns (or NULL) #' @param gap_scores_df data.frame with Field_id and gap_score columns (or NULL) #' @param field_boundaries_sf sf object with field geometries #' @param end_date Date object for current report date #' @return data.frame with all 22 KPI columns calculate_all_field_kpis <- function(current_stats, planting_dates, imminent_prob_data, gap_scores_df, field_boundaries_sf, end_date) { message("\nBuilding final field analysis output...") # Pre-calculate acreages acreage_lookup <- calculate_field_acreages(field_boundaries_sf) field_analysis_df <- current_stats %>% mutate( # Column 2: Farm_Section (user fills manually) Farm_Section = NA_character_, # Column 3: Field_name (from GeoJSON) Field_name = Field_id, # Column 4: Acreage (from geometry) Acreage = { acreages_vec <- acreage_lookup$acreage[match(Field_id, acreage_lookup$field)] if_else(is.na(acreages_vec), 0, acreages_vec) }, # Column 8: Last_harvest_or_planting_date (from harvest.xlsx) Last_harvest_or_planting_date = { planting_dates$planting_date[match(Field_id, planting_dates$field_id)] }, # Column 9: Age_week (calculated) Age_week = { sapply(seq_len(nrow(current_stats)), function(idx) { calculate_age_week(Last_harvest_or_planting_date[idx], end_date) }) }, # Column 10: Phase (based on Age_week) Phase = sapply(Age_week, get_phase_by_age), # Column 12: Germination_progress (binned Pct_pixels_CI_gte_2) Germination_progress = sapply(Pct_pixels_CI_gte_2, bin_percentage), # Column 13: Imminent_prob (from script 31 or NA) Imminent_prob = { if (!is.null(imminent_prob_data)) { as.numeric(imminent_prob_data$Imminent_prob_actual[match(Field_id, imminent_prob_data$Field_id)]) } else { rep(NA_real_, nrow(current_stats)) } }, # Column 14: Status_Alert (multi-priority logic for harvest/milling workflow) Status_Alert = { sapply(seq_len(nrow(current_stats)), function(idx) { calculate_status_alert( imminent_prob = Imminent_prob[idx], age_week = Age_week[idx], mean_ci = Mean_CI[idx], four_week_trend = Four_week_trend[idx], weekly_ci_change = Weekly_ci_change[idx], cv = CV[idx] ) }) }, # Column 19b: CV_Trend_Long_Term_Category (categorical slope) CV_Trend_Long_Term_Category = sapply(current_stats$CV_Trend_Long_Term, categorize_cv_trend_long_term), # Column 21: Cloud_pct_clear (binned into intervals) Cloud_pct_clear = sapply(Cloud_pct_clear, bin_percentage), # Column 22: Gap_score (2σ method) Gap_score = { if (!is.null(gap_scores_df) && nrow(gap_scores_df) > 0) { gap_scores_df$gap_score[match(current_stats$Field_id, gap_scores_df$Field_id)] } else { rep(NA_real_, nrow(current_stats)) } } ) %>% select( all_of(c("Field_id", "Farm_Section", "Field_name", "Acreage", "Status_Alert", "Last_harvest_or_planting_date", "Age_week", "Phase", "Germination_progress", "Mean_CI", "Weekly_ci_change", "Four_week_trend", "CI_range", "CI_Percentiles", "CV", "CV_Trend_Short_Term", "CV_Trend_Long_Term", "CV_Trend_Long_Term_Category", "Imminent_prob", "Cloud_pct_clear", "Cloud_category", "Gap_score")) ) message(paste("✓ Built final output with", nrow(field_analysis_df), "fields and 22 columns")) return(field_analysis_df) } #' Aggregate per-field data into farm-level KPI summary #' #' @param field_analysis_df data.frame with per-field KPI data #' @param current_week Numeric current week number #' @param current_year Numeric current year #' @param end_date Date object for current report date #' @return List with phase_distribution, status_distribution, cloud_distribution, overall_stats calculate_farm_level_kpis <- function(field_analysis_df, current_week, current_year, end_date) { cat("\n=== CALCULATING FARM-LEVEL KPI SUMMARY ===\n") # Filter to only fields with actual data field_data <- field_analysis_df %>% filter(!is.na(Mean_CI) & !is.na(Acreage)) %>% filter(Acreage > 0) if (nrow(field_data) == 0) { message("No valid field data for farm-level aggregation") return(NULL) } farm_summary <- list() # 1. PHASE DISTRIBUTION phase_dist <- field_data %>% group_by(Phase) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Phase) farm_summary$phase_distribution <- phase_dist # 2. STATUS ALERT DISTRIBUTION status_dist <- field_data %>% group_by(Status_Alert) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Status_Alert) farm_summary$status_distribution <- status_dist # 3. CLOUD COVERAGE DISTRIBUTION cloud_dist <- field_data %>% group_by(Cloud_category) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Cloud_category) farm_summary$cloud_distribution <- cloud_dist # 4. OVERALL STATISTICS farm_summary$overall_stats <- data.frame( total_fields = nrow(field_data), total_acreage = sum(field_data$Acreage, na.rm = TRUE), mean_ci = round(mean(field_data$Mean_CI, na.rm = TRUE), 2), median_ci = round(median(field_data$Mean_CI, na.rm = TRUE), 2), mean_cv = round(mean(field_data$CV, na.rm = TRUE), 4), week = current_week, year = current_year, date = as.character(end_date) ) # Print summaries cat("\n--- PHASE DISTRIBUTION ---\n") print(phase_dist) cat("\n--- STATUS TRIGGER DISTRIBUTION ---\n") print(status_dist) cat("\n--- CLOUD COVERAGE DISTRIBUTION ---\n") print(cloud_dist) cat("\n--- OVERALL FARM STATISTICS ---\n") print(farm_summary$overall_stats) return(farm_summary) } # ============================================================================ # ORCHESTRATOR FOR CANE_SUPPLY WORKFLOWS # ============================================================================ #' Main orchestrator for CANE_SUPPLY per-field KPI workflow #' #' This function coordinates all KPI calculations for the per-field analysis workflow. #' It loads historical data, calculates current/previous week statistics, computes #' all 22 KPI columns, and aggregates farm-level summaries. #' #' @param setup List with directory paths (kpi_reports_dir, data_dir, etc.) #' @param client_config List with workflow configuration (script_91_compatible, outputs) #' @param end_date Date object for current report date #' @param project_dir Character project identifier #' @param weekly_mosaic Character path to weekly mosaic directory #' @param daily_vals_dir Character path to daily values directory #' @param field_boundaries_sf sf object with field geometries #' @param data_dir Character path to data directory #' @return List with field_analysis_df, farm_kpi_results, export_paths calculate_field_analysis_cane_supply <- function(setup, client_config, end_date, project_dir, weekly_mosaic, daily_vals_dir, field_boundaries_sf, data_dir) { message("\n", strrep("=", 70)) message("CANE_SUPPLY WORKFLOW: PER-FIELD ANALYSIS (Script 91 compatible)") message(strrep("=", 70)) reports_dir <- file.path(setup$reports_dir, "kpis") # ========== PHASE 1: WEEKLY ANALYSIS SETUP ========== message("\n", strrep("-", 70)) message("PHASE 1: PER-FIELD WEEKLY ANALYSIS ") message(strrep("-", 70)) weeks <- calculate_week_numbers(end_date) current_week <- weeks$current_week current_year <- weeks$current_year previous_week <- weeks$previous_week previous_year <- weeks$previous_year message(paste("Week:", current_week, "/ Year (ISO 8601):", current_year)) # Find per-field weekly mosaics message("Finding per-field weekly mosaics...") if (!dir.exists(weekly_mosaic)) { stop(paste("ERROR: weekly_mosaic directory not found:", weekly_mosaic, "\nScript 40 (mosaic creation) must be run first.")) } field_dirs <- list.dirs(weekly_mosaic, full.names = FALSE, recursive = FALSE) field_dirs <- field_dirs[field_dirs != ""] if (length(field_dirs) == 0) { stop(paste("ERROR: No field subdirectories found in:", weekly_mosaic, "\nScript 40 must create weekly_mosaic/{FIELD}/ structure.")) } # Verify we have mosaics for this week single_file_pattern <- sprintf("week_%02d_%d\\.tif", current_week, current_year) per_field_files <- c() for (field in field_dirs) { field_mosaic_dir <- file.path(weekly_mosaic, field) files <- list.files(field_mosaic_dir, pattern = single_file_pattern, full.names = TRUE) if (length(files) > 0) { per_field_files <- c(per_field_files, files) } } if (length(per_field_files) == 0) { stop(paste("ERROR: No mosaics found for week", current_week, "year", current_year, "\nExpected pattern:", single_file_pattern, "\nChecked:", weekly_mosaic)) } message(paste(" ✓ Found", length(per_field_files), "per-field weekly mosaics")) # ========== PHASE 2: LOAD HISTORICAL DATA ========== message("\nLoading historical field data for trend calculations...") num_weeks_to_load <- max(WEEKS_FOR_FOUR_WEEK_TREND, WEEKS_FOR_CV_TREND_LONG) message(paste(" Attempting to load up to", num_weeks_to_load, "weeks of historical data...")) allow_auto_gen <- !exists("_INSIDE_AUTO_GENERATE", envir = .GlobalEnv) historical_data <- load_historical_field_data( project_dir, current_week, current_year, reports_dir, num_weeks = num_weeks_to_load, auto_generate = allow_auto_gen, field_boundaries_sf = field_boundaries_sf, daily_vals_dir = daily_vals_dir ) # ========== PHASE 3: LOAD PLANTING DATES ========== message("\nLoading harvest data from harvest.xlsx for planting dates...") # Use load_harvest_data() from 80_utils_common.R for consistency with 80_calculate_kpis.R harvesting_data <- load_harvest_data(data_dir) planting_dates <- extract_planting_dates(harvesting_data, field_boundaries_sf) if (is.null(planting_dates) || nrow(planting_dates) == 0) { message("WARNING: No planting dates available. Using NA for all fields.") planting_dates <- data.frame( field_id = field_boundaries_sf$field, planting_date = rep(as.Date(NA), nrow(field_boundaries_sf)), stringsAsFactors = FALSE ) } # ========== PHASE 4: CALCULATE WEEKLY STATISTICS ========== message("\nUsing modular RDS-based approach for weekly statistics...") # Current week message("\n1. Loading/calculating CURRENT week statistics (week", current_week, ")...") current_stats <- load_or_calculate_weekly_stats( week_num = current_week, year = current_year, project_dir = project_dir, field_boundaries_sf = field_boundaries_sf, mosaic_dir = weekly_mosaic, reports_dir = reports_dir, report_date = end_date ) message(paste(" ✓ Loaded/calculated stats for", nrow(current_stats), "fields in current week")) # Previous week message("\n2. Loading/calculating PREVIOUS week statistics (week", previous_week, ")...") prev_report_date <- end_date - 7 prev_stats <- load_or_calculate_weekly_stats( week_num = previous_week, year = previous_year, project_dir = project_dir, field_boundaries_sf = field_boundaries_sf, mosaic_dir = weekly_mosaic, reports_dir = reports_dir, report_date = prev_report_date ) message(paste(" ✓ Loaded/calculated stats for", nrow(prev_stats), "fields in previous week")) # ========== PHASE 5: CALCULATE TRENDS ========== message("\n3. Calculating trend columns...") current_stats <- calculate_kpi_trends( current_stats, prev_stats, project_dir = project_dir, reports_dir = reports_dir, current_week = current_week, year = current_year ) message(paste(" ✓ Added Weekly_ci_change, CV_Trend_Short_Term, Four_week_trend, CV_Trend_Long_Term, nmr_of_weeks_analysed")) # ========== PHASE 6: LOAD HARVEST PROBABILITIES ========== message("\n4. Loading harvest probabilities from script 31...") # Use get_harvest_output_path() to safely build path (stored in kpi_reports_dir) harvest_prob_file <- get_harvest_output_path(project_dir, current_week, current_year) message(paste(" Looking for:", harvest_prob_file)) imminent_prob_data <- tryCatch({ if (file.exists(harvest_prob_file)) { prob_df <- readr::read_csv(harvest_prob_file, show_col_types = FALSE, col_types = readr::cols(.default = readr::col_character())) message(paste(" ✓ Loaded harvest probabilities for", nrow(prob_df), "fields")) prob_df %>% select(field, imminent_prob, detected_prob) %>% rename(Field_id = field, Imminent_prob_actual = imminent_prob, Detected_prob = detected_prob) } else { message(paste(" INFO: Harvest probabilities not available (script 31 not run)")) NULL } }, error = function(e) { message(paste(" WARNING: Could not load harvest probabilities:", e$message)) NULL }) # ========== PHASE 7: CALCULATE GAP SCORES ========== gap_scores_df <- calculate_gap_scores(per_field_files, field_boundaries_sf) # ========== PHASE 8: BUILD FINAL PER-FIELD DATAFRAME ========== field_analysis_df <- calculate_all_field_kpis( current_stats = current_stats, planting_dates = planting_dates, imminent_prob_data = imminent_prob_data, gap_scores_df = gap_scores_df, field_boundaries_sf = field_boundaries_sf, end_date = end_date ) # ========== PHASE 9: EXPORT PER-FIELD RESULTS ========== export_paths <- export_field_analysis_excel( field_analysis_df, NULL, project_dir, current_week, current_year, reports_dir ) # cat("\n--- Per-field Results (first 10) ---\n") # available_cols <- c("Field_id", "Acreage", "Age_week", "Mean_CI", "Four_week_trend", "Status_Alert", "Cloud_category") # available_cols <- available_cols[available_cols %in% names(field_analysis_df)] # if (length(available_cols) > 0) { # print(head(field_analysis_df[, available_cols], 10)) # } # ========== PHASE 10: CALCULATE FARM-LEVEL KPIS ========== # farm_kpi_results <- calculate_farm_level_kpis( # field_analysis_df, # current_week, # current_year, # end_date # ) # For now, farm-level KPIs are not implemented in CANE_SUPPLY workflow farm_kpi_results <- NULL # ========== RETURN RESULTS ========== return(list( field_analysis_df = field_analysis_df, farm_kpi_results = farm_kpi_results, export_paths = export_paths )) } # ============================================================================ # FUTURE EXTENSION POINTS # ============================================================================ # Placeholder for ANGATA-specific utilities that may be added in future: # - Custom yield models based on ANGATA historical data # - Field condition thresholds specific to ANGATA growing practices # - Integration with ANGATA harvest scheduling system # - WhatsApp messaging templates for ANGATA supply chain stakeholders # - Cost/benefit analysis for ANGATA operational decisions # These functions can be added here as ANGATA requirements evolve.