# 80_UTILS_CANE_SUPPLY.R # ============================================================================ # CANE SUPPLY CLIENT-SPECIFIC UTILITIES (SCRIPT 80 - CLIENT TYPE: cane_supply) # # Contains ANGATA and other cane supply-specific KPI and reporting functions. # # Currently, CANE_SUPPLY clients use the common utilities from 80_utils_common.R: # - Weekly statistics (calculate_field_statistics, calculate_kpi_trends) # - Field analysis reporting (generate_field_analysis_summary) # - Excel export (export_field_analysis_excel) # # This file is structured to accommodate future ANGATA-specific functionality such as: # - Custom yield models # - Harvest readiness criteria # - Supply chain integration hooks # - ANGATA-specific alerting and messaging # # Orchestrator: (Placeholder - uses common functions) # Dependencies: 00_common_utils.R, 80_utils_common.R # Used by: 80_calculate_kpis.R (when client_type == "cane_supply") # ============================================================================ library(terra) library(sf) library(dplyr) library(tidyr) library(readxl) library(writexl) # ============================================================================ # ALERT THRESHOLDS & CONFIGURATION CONSTANTS # ============================================================================ # CI change thresholds for alert categorization and status determination # These values are project-standard and should be consistent across all workflows CI_CHANGE_RAPID_GROWTH_THRESHOLD <- 0.5 # Weekly CI change for positive growth alert CI_CHANGE_POSITIVE_GROWTH_THRESHOLD <- 0.2 # Weekly CI change for acceptable growth CI_CHANGE_STABLE_THRESHOLD <- -0.2 # Weekly CI change for stable status (between -0.2 and +0.2) CI_CHANGE_STRESS_TREND_THRESHOLD <- -0.3 # 4-week trend threshold for stress detection CI_CHANGE_RAPID_DECLINE_THRESHOLD <- -0.5 # Weekly CI change for rapid decline alert # Deprecated aliases (for backward compatibility if needed): CI_CHANGE_DECLINE_THRESHOLD <- CI_CHANGE_RAPID_DECLINE_THRESHOLD # Weekly CI change threshold for decline alerts CI_CHANGE_INCREASE_THRESHOLD <- CI_CHANGE_RAPID_GROWTH_THRESHOLD # Weekly CI change threshold for increase alerts # ============================================================================ # ANGATA-SPECIFIC HELPER FUNCTIONS (Placeholder Section) # ============================================================================ #' Calculate acreage for each field from geometry #' @param field_boundaries_sf sf object with field geometries #' @param unit Character. Unit preference: "hectare" or "acre" (default from AREA_UNIT_PREFERENCE) #' @return data.frame with field and area columns (column name reflects unit: Area_ha or Area_ac) calculate_field_acreages <- function(field_boundaries_sf, unit = AREA_UNIT_PREFERENCE) { tryCatch({ # Get field identifier (handles pivot.geojson structure) # Use intersect() to find the first matching column, avoiding pull() ambiguity field_names <- { available_cols <- intersect( colnames(field_boundaries_sf), c("field", "field_id", "Field_id", "name", "Name") ) if (length(available_cols) > 0) { # Extract the first matching column field_boundaries_sf %>% sf::st_drop_geometry() %>% pull(available_cols[1]) } else { # Fall back to sequential indices if no field name column exists seq_len(nrow(field_boundaries_sf)) } } # Use unified area calculation function areas <- calculate_area_from_geometry(field_boundaries_sf, unit = unit) # Create output data frame with unit-aware column name unit_label <- get_area_unit_label(unit) col_name <- paste0("Area_", unit_label) result_df <- data.frame( field = field_names, area = areas, stringsAsFactors = FALSE ) colnames(result_df) <- c("field", col_name) # Aggregate by field to handle multi-row fields (e.g., sub_fields) # Use bare lambda to preserve original column name (not list() which creates suffixes) result_df %>% group_by(field) %>% summarise(across(all_of(col_name), ~ sum(., na.rm = TRUE)), .groups = "drop") }, error = function(e) { message(paste("Warning: Could not calculate areas from geometries -", e$message)) unit_label <- get_area_unit_label(unit) col_name <- paste0("Area_", unit_label) result_df <- data.frame(field = character(0)) result_df[[col_name]] <- numeric(0) return(result_df) }) } #' Calculate age in weeks from planting date #' #' @param planting_date Date of planting #' @param reference_date Date to calculate age relative to (typically end_date) #' @return Numeric age in weeks (rounded to nearest week) calculate_age_week <- function(planting_date, reference_date) { if (is.na(planting_date)) { return(NA_real_) } round(as.numeric(difftime(reference_date, planting_date, units = "weeks")), 0) } #' Categorize regression slope into field uniformity interpretation #' #' **Input**: Numeric slope from `calculate_regression_slope()` applied to CV values #' **Output**: 3-category labels for field uniformity trend #' #' **Categories**: #' - "More uniform" (slope < -0.01): Field becoming more homogeneous #' - "Stable uniformity" (-0.01 ≤ slope ≤ 0.01): Field uniformity stable #' - "Less uniform" (slope > 0.01): Field becoming more patchy/heterogeneous #' #' @param slope Numeric slope from `calculate_regression_slope(cv_values)` #' @return Character category or NA #' categorize_cv_trend_long_term <- function(slope) { if (is.na(slope)) { return(NA_character_) } else if (slope < -0.01) { return("More uniform") } else if (slope > 0.01) { return("Less uniform") } else { return("Stable uniformity") } } #' Determine status alert for CANE_SUPPLY client (harvest/milling workflow) #' #' Alerts focus on: harvest readiness, crop health monitoring, exception detection #' Uses WEEKLY trends (Four_week_trend) not daily noise for consistency #' Designed for harvest/milling clients who manage expectation, not daily operations #' #' Priority order: #' 1. harvest_ready → Schedule harvest operations #' 2. harvested_bare → Record completion / detect unexpected harvest #' 3. stress_detected → Monitor crop health (consistent decline) #' 4. germination_delayed → Early warning for young fields #' 5. growth_on_track → Positive signal (no action needed) #' 6. NA → Normal growth (no alert) #' #' @param imminent_prob Numeric harvest probability (0-1) #' @param age_week Numeric age in weeks since planting/harvest #' @param mean_ci Numeric mean Chlorophyll Index #' @param four_week_trend Numeric 4-week trend in CI (slope of growth) #' @param weekly_ci_change Numeric week-over-week CI change #' @param cv Numeric coefficient of variation (field uniformity) #' @return Character status alert code or NA calculate_status_alert <- function(imminent_prob, age_week, mean_ci, four_week_trend, weekly_ci_change, cv) { # Priority 1: HARVEST READY - highest business priority # Field is mature (≥12 months) AND harvest model predicts imminent harvest if (!is.na(imminent_prob) && imminent_prob > 0.5 && !is.na(age_week) && age_week >= 52) { return("harvest_ready") } # Priority 2: HARVESTED/BARE - indicator of completion (or unexpected harvest) # Mean CI dropped below vegetative threshold if (!is.na(mean_ci) && mean_ci < 1.5) { return("harvested_bare") } # Priority 3: STRESS DETECTED - consistent health decline (weekly trend) # Uses Four_week_trend (smooth trend) NOT daily fluctuation to avoid noise # Crop declining but not yet bare → opportunity to investigate # References: CI_CHANGE_STRESS_TREND_THRESHOLD for 4-week trend detection if (!is.na(four_week_trend) && four_week_trend < CI_CHANGE_STRESS_TREND_THRESHOLD && !is.na(mean_ci) && mean_ci > 1.5) { return("stress_detected") } # Priority 4: GERMINATION DELAYED - early warning for young fields # Age 4-8 weeks is typical germination window; low CI = slow start if (!is.na(age_week) && age_week >= 4 && age_week <= 8 && !is.na(mean_ci) && mean_ci < 1.5) { return("germination_delayed") } # Priority 5: GROWTH ON TRACK - positive operational status # Field is healthy, uniform, and growing steadily (no action needed) # Conditions: good uniformity (CV < 0.15) AND stable/positive weekly trend # References: CI_CHANGE_STABLE_THRESHOLD (±0.2 = stable, no change) if (!is.na(cv) && cv < 0.15 && !is.na(four_week_trend) && four_week_trend >= CI_CHANGE_STABLE_THRESHOLD && !is.na(weekly_ci_change) && weekly_ci_change >= CI_CHANGE_STABLE_THRESHOLD) { return("growth_on_track") } # Default: NORMAL GROWTH (no specific alert) # Field is growing but may have minor variability; continues normal monitoring NA_character_ } #' Build complete per-field KPI dataframe - ALWAYS ALL FIELDS from GeoJSON #' #' CRITICAL: Always outputs all fields from field_boundaries_sf, regardless of data availability. #' Fields without satellite data this week have all KPI columns set to NA. #' This is dynamic - output row count equals nrow(field_boundaries_sf), not hardcoded. #' #' @param current_stats data.frame with current week statistics (may have fewer rows if data is incomplete) #' @param planting_dates data.frame with field_id and planting_date columns #' @param imminent_prob_data data.frame with Field_id and Imminent_prob_actual columns (or NULL) #' @param gap_scores_df data.frame with Field_id and gap_score columns (guaranteed to have all fields) #' @param field_boundaries_sf sf object with field geometries (authoritative source of all field IDs) #' @param end_date Date object for current report date #' @return data.frame with 22 columns and nrow(field_boundaries_sf) rows (all fields, NAs for missing data) calculate_all_field_kpis <- function(current_stats, planting_dates, imminent_prob_data, gap_scores_df, field_boundaries_sf, end_date) { message("\nBuilding final field analysis output...") # CRITICAL: Start with ALL fields from field_boundaries_sf (dynamic, based on GeoJSON) # This ensures output always includes all fields, even if they lack satellite data all_fields <- field_boundaries_sf %>% sf::st_drop_geometry() %>% mutate(Field_id = field) %>% select(Field_id) %>% distinct() # Pre-calculate areas using unified function acreage_lookup <- calculate_field_acreages(field_boundaries_sf, unit = AREA_UNIT_PREFERENCE) # Determine area column name from result unit_label <- get_area_unit_label(AREA_UNIT_PREFERENCE) area_col_name <- paste0("Area_", unit_label) # Left-join current_stats to all_fields to preserve all field IDs # Fields without data in current_stats will have NA values field_analysis_df <- all_fields %>% left_join(current_stats, by = "Field_id") %>% mutate( # Column 2: Farm_Section (user fills manually) Farm_Section = NA_character_, # Column 3: Field_name (from GeoJSON) Field_name = Field_id, # Column 4: Acreage (from geometry, unit-aware) Acreage = { acreages_vec <- acreage_lookup[[area_col_name]][match(Field_id, acreage_lookup$field)] if_else(is.na(acreages_vec), 0, acreages_vec) }, # Column 8: Last_harvest_or_planting_date (from harvest.xlsx) Last_harvest_or_planting_date = { planting_dates$planting_date[match(Field_id, planting_dates$field_id)] }, # Column 9: Age_week (calculated) Age_week = { sapply(seq_len(n()), function(idx) { calculate_age_week(Last_harvest_or_planting_date[idx], end_date) }) }, # Column 10: Phase (based on Age_week) Phase = sapply(Age_week, get_phase_by_age), # Column 12: Germination_progress (binned Pct_pixels_CI_gte_2) Germination_progress = sapply(Pct_pixels_CI_gte_2, bin_percentage), # Column 13: Imminent_prob (from script 31 or NA) Imminent_prob = { if (!is.null(imminent_prob_data)) { as.numeric(imminent_prob_data$Imminent_prob_actual[match(Field_id, imminent_prob_data$Field_id)]) } else { rep(NA_real_, n()) } }, # Column 14: Status_Alert (multi-priority logic for harvest/milling workflow) Status_Alert = { sapply(seq_len(n()), function(idx) { calculate_status_alert( imminent_prob = Imminent_prob[idx], age_week = Age_week[idx], mean_ci = Mean_CI[idx], four_week_trend = Four_week_trend[idx], weekly_ci_change = Weekly_ci_change[idx], cv = CV[idx] ) }) }, # Column 19b: CV_Trend_Long_Term_Category (categorical slope) CV_Trend_Long_Term_Category = sapply(CV_Trend_Long_Term, categorize_cv_trend_long_term), # Column 21: Cloud_pct_clear (binned into intervals) Cloud_pct_clear = sapply(Cloud_pct_clear, bin_percentage), # Column 22: Gap_score (1σ method); gap_scores_df now has all fields Gap_score = { if (!is.null(gap_scores_df) && nrow(gap_scores_df) > 0) { gap_scores_df$gap_score[match(Field_id, gap_scores_df$Field_id)] } else { rep(NA_real_, n()) } } ) %>% select( all_of(c("Field_id", "Farm_Section", "Field_name", "Acreage", "Status_Alert", "Last_harvest_or_planting_date", "Age_week", "Phase", "Germination_progress", "Mean_CI", "Weekly_ci_change", "Four_week_trend", "CI_range", "CI_Percentiles", "CV", "CV_Trend_Short_Term", "CV_Trend_Long_Term", "CV_Trend_Long_Term_Category", "Imminent_prob", "Cloud_pct_clear", "Cloud_category", "Gap_score")) ) message(paste("✓ Built final output with", nrow(field_analysis_df), "fields (from GeoJSON) and 22 columns")) return(field_analysis_df) } #' Aggregate per-field data into farm-level KPI summary #' #' @param field_analysis_df data.frame with per-field KPI data #' @param current_week Numeric current week number #' @param current_year Numeric current year #' @param end_date Date object for current report date #' @return List with phase_distribution, status_distribution, cloud_distribution, overall_stats calculate_farm_level_kpis <- function(field_analysis_df, current_week, current_year, end_date) { cat("\n=== CALCULATING FARM-LEVEL KPI SUMMARY ===\n") # Filter to only fields with actual data field_data <- field_analysis_df %>% filter(!is.na(Mean_CI) & !is.na(Acreage)) %>% filter(Acreage > 0) if (nrow(field_data) == 0) { message("No valid field data for farm-level aggregation") return(NULL) } farm_summary <- list() # 1. PHASE DISTRIBUTION phase_dist <- field_data %>% group_by(Phase) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Phase) farm_summary$phase_distribution <- phase_dist # 2. STATUS ALERT DISTRIBUTION status_dist <- field_data %>% group_by(Status_Alert) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Status_Alert) farm_summary$status_distribution <- status_dist # 3. CLOUD COVERAGE DISTRIBUTION cloud_dist <- field_data %>% group_by(Cloud_category) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Cloud_category) farm_summary$cloud_distribution <- cloud_dist # 4. OVERALL STATISTICS farm_summary$overall_stats <- data.frame( total_fields = nrow(field_data), total_acreage = sum(field_data$Acreage, na.rm = TRUE), mean_ci = round(mean(field_data$Mean_CI, na.rm = TRUE), 2), median_ci = round(median(field_data$Mean_CI, na.rm = TRUE), 2), mean_cv = round(mean(field_data$CV, na.rm = TRUE), 4), week = current_week, year = current_year, date = as.character(end_date) ) # Print summaries cat("\n--- PHASE DISTRIBUTION ---\n") print(phase_dist) cat("\n--- STATUS TRIGGER DISTRIBUTION ---\n") print(status_dist) cat("\n--- CLOUD COVERAGE DISTRIBUTION ---\n") print(cloud_dist) cat("\n--- OVERALL FARM STATISTICS ---\n") print(farm_summary$overall_stats) return(farm_summary) } # ============================================================================ # ORCHESTRATOR FOR CANE_SUPPLY WORKFLOWS # ============================================================================ #' Main orchestrator for CANE_SUPPLY per-field KPI workflow #' #' This function coordinates all KPI calculations for the per-field analysis workflow. #' It loads historical data, calculates current/previous week statistics, computes #' all 22 KPI columns, and aggregates farm-level summaries. #' #' @param setup List with directory paths (kpi_reports_dir, data_dir, etc.) #' @param client_config List with workflow configuration (script_91_compatible, outputs) #' @param end_date Date object for current report date #' @param project_dir Character project identifier #' @param weekly_mosaic Character path to weekly mosaic directory #' @param daily_vals_dir Character path to daily values directory #' @param field_boundaries_sf sf object with field geometries #' @param data_dir Character path to data directory #' @return List with field_analysis_df, farm_kpi_results, export_paths calculate_field_analysis_cane_supply <- function(setup, client_config, end_date, project_dir, weekly_mosaic, daily_vals_dir, field_boundaries_sf, data_dir) { message("\n", strrep("=", 70)) message("CANE_SUPPLY WORKFLOW: PER-FIELD ANALYSIS (Script 91 compatible)") message(strrep("=", 70)) reports_dir <- file.path(setup$reports_dir, "kpis") # ========== PHASE 1: WEEKLY ANALYSIS SETUP ========== message("\n", strrep("-", 70)) message("PHASE 1: PER-FIELD WEEKLY ANALYSIS ") message(strrep("-", 70)) weeks <- calculate_week_numbers(end_date) current_week <- weeks$current_week current_year <- weeks$current_year previous_week <- weeks$previous_week previous_year <- weeks$previous_year message(paste("Week:", current_week, "/ Year (ISO 8601):", current_year)) # Find per-field weekly mosaics message("Finding per-field weekly mosaics...") if (!dir.exists(weekly_mosaic)) { stop(paste("ERROR: weekly_mosaic directory not found:", weekly_mosaic, "\nScript 40 (mosaic creation) must be run first.")) } field_dirs <- list.dirs(weekly_mosaic, full.names = FALSE, recursive = FALSE) field_dirs <- field_dirs[field_dirs != ""] if (length(field_dirs) == 0) { stop(paste("ERROR: No field subdirectories found in:", weekly_mosaic, "\nScript 40 must create weekly_mosaic/{FIELD}/ structure.")) } # Verify we have mosaics for this week single_file_pattern <- sprintf("week_%02d_%d\\.tif", current_week, current_year) per_field_files <- c() for (field in field_dirs) { field_mosaic_dir <- file.path(weekly_mosaic, field) files <- list.files(field_mosaic_dir, pattern = single_file_pattern, full.names = TRUE) if (length(files) > 0) { per_field_files <- c(per_field_files, files) } } if (length(per_field_files) == 0) { stop(paste("ERROR: No mosaics found for week", current_week, "year", current_year, "\nExpected pattern:", single_file_pattern, "\nChecked:", weekly_mosaic)) } message(paste(" ✓ Found", length(per_field_files), "per-field weekly mosaics")) # ========== PHASE 2: LOAD HISTORICAL DATA ========== message("\nLoading historical field data for trend calculations...") num_weeks_to_load <- max(WEEKS_FOR_FOUR_WEEK_TREND, WEEKS_FOR_CV_TREND_LONG) message(paste(" Attempting to load up to", num_weeks_to_load, "weeks of historical data...")) allow_auto_gen <- !exists("_INSIDE_AUTO_GENERATE", envir = .GlobalEnv) historical_data <- load_historical_field_data( project_dir, current_week, current_year, reports_dir, num_weeks = num_weeks_to_load, auto_generate = allow_auto_gen, field_boundaries_sf = field_boundaries_sf, daily_vals_dir = daily_vals_dir ) # ========== PHASE 3: LOAD PLANTING DATES ========== message("\nLoading harvest data from harvest.xlsx for planting dates...") # Use load_harvest_data() from 80_utils_common.R for consistency with 80_calculate_kpis.R harvesting_data <- load_harvest_data(data_dir) planting_dates <- extract_planting_dates(harvesting_data, field_boundaries_sf) if (is.null(planting_dates) || nrow(planting_dates) == 0) { message("WARNING: No planting dates available. Using NA for all fields.") planting_dates <- data.frame( field_id = field_boundaries_sf$field, planting_date = rep(as.Date(NA), nrow(field_boundaries_sf)), stringsAsFactors = FALSE ) } # ========== PHASE 4: CALCULATE WEEKLY STATISTICS ========== message("\nUsing modular RDS-based approach for weekly statistics...") # Current week message("\n1. Loading/calculating CURRENT week statistics (week", current_week, ")...") current_stats <- load_or_calculate_weekly_stats( week_num = current_week, year = current_year, project_dir = project_dir, field_boundaries_sf = field_boundaries_sf, mosaic_dir = weekly_mosaic, reports_dir = reports_dir, report_date = end_date ) message(paste(" ✓ Loaded/calculated stats for", nrow(current_stats), "fields in current week")) # Previous week message("\n2. Loading/calculating PREVIOUS week statistics (week", previous_week, ")...") prev_report_date <- end_date - 7 prev_stats <- load_or_calculate_weekly_stats( week_num = previous_week, year = previous_year, project_dir = project_dir, field_boundaries_sf = field_boundaries_sf, mosaic_dir = weekly_mosaic, reports_dir = reports_dir, report_date = prev_report_date ) message(paste(" ✓ Loaded/calculated stats for", nrow(prev_stats), "fields in previous week")) # ========== PHASE 5: CALCULATE TRENDS ========== message("\n3. Calculating trend columns...") current_stats <- calculate_kpi_trends( current_stats, prev_stats, project_dir = project_dir, reports_dir = reports_dir, current_week = current_week, year = current_year ) message(paste(" ✓ Added Weekly_ci_change, CV_Trend_Short_Term, Four_week_trend, CV_Trend_Long_Term, nmr_of_weeks_analysed")) # ========== PHASE 6: LOAD HARVEST PROBABILITIES ========== message("\n4. Loading harvest probabilities from script 31...") # Use get_harvest_output_path() to safely build path (stored in kpi_reports_dir) harvest_prob_file <- get_harvest_output_path(project_dir, current_week, current_year) message(paste(" Looking for:", harvest_prob_file)) imminent_prob_data <- tryCatch({ if (file.exists(harvest_prob_file)) { prob_df <- readr::read_csv(harvest_prob_file, show_col_types = FALSE, col_types = readr::cols(.default = readr::col_character())) message(paste(" ✓ Loaded harvest probabilities for", nrow(prob_df), "fields")) prob_df %>% select(field, imminent_prob, detected_prob) %>% rename(Field_id = field, Imminent_prob_actual = imminent_prob, Detected_prob = detected_prob) } else { message(paste(" INFO: Harvest probabilities not available (script 31 not run)")) NULL } }, error = function(e) { message(paste(" WARNING: Could not load harvest probabilities:", e$message)) NULL }) # ========== PHASE 7: CALCULATE GAP SCORES ========== gap_scores_df <- calculate_gap_scores(per_field_files, field_boundaries_sf) # ========== PHASE 8: BUILD FINAL PER-FIELD DATAFRAME ========== field_analysis_df <- calculate_all_field_kpis( current_stats = current_stats, planting_dates = planting_dates, imminent_prob_data = imminent_prob_data, gap_scores_df = gap_scores_df, field_boundaries_sf = field_boundaries_sf, end_date = end_date ) # ========== PHASE 9: EXPORT PER-FIELD RESULTS ========== export_paths <- export_field_analysis_excel( field_analysis_df, NULL, project_dir, current_week, current_year, reports_dir ) # For now, farm-level KPIs are not implemented in CANE_SUPPLY workflow farm_kpi_results <- NULL # ========== RETURN RESULTS ========== return(list( field_analysis_df = field_analysis_df, farm_kpi_results = farm_kpi_results, export_paths = export_paths )) } # ============================================================================ # FUTURE EXTENSION POINTS # ============================================================================ # Placeholder for ANGATA-specific utilities that may be added in future: # - Custom yield models based on ANGATA historical data # - Field condition thresholds specific to ANGATA growing practices # - Integration with ANGATA harvest scheduling system # - WhatsApp messaging templates for ANGATA supply chain stakeholders # - Cost/benefit analysis for ANGATA operational decisions # These functions can be added here as ANGATA requirements evolve.