# 80_CALCULATE_KPIS.R (CONSOLIDATED KPI CALCULATION) # ============================================================================ # UNIFIED KPI CALCULATION SCRIPT # # This script combines: # 1. Per-field weekly analysis (from 09c: field-level trends, phases, statuses) # 2. Farm-level KPI metrics (from old 09: 6 high-level indicators) # # FEATURES: # - Per-field analysis with SC-64 enhancements (4-week trends, CI percentiles, etc.) # - Farm-level KPI calculation (6 metrics for executive overview) # - Parallel processing (tile-aware, 1000+ fields supported) # - Comprehensive Excel + RDS + CSV exports (21 columns per spec) # - Test mode for development # # COMMAND-LINE USAGE: # Option 1: Rscript 80_calculate_kpis.R 2026-01-14 angata # Arguments: [end_date] [project_dir] # # Option 2: Rscript 80_calculate_kpis.R 2026-01-14 angata 7 # Arguments: [end_date] [project_dir] [offset_days] # # & "C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe" r_app/80_calculate_kpis.R 2026-01-12 angata 7 # # Usage in run_full_pipeline.R: # source("r_app/80_calculate_kpis.R") # main() # ============================================================================ # EXCEL OUTPUT SPECIFICATION (21 COLUMNS) # ============================================================================ # This script exports 21 columns per the field analysis specification: # # COMPLETED/IN PROGRESS: # 1. Field_id ✓ - Unique field identifier # 2. Farm_Section - Management zone (to be filled by user) # 3. Field_name ✓ - Client-facing field name (from GeoJSON) # 4. Acreage ✓ - Field size in acres # 5. Mean_CI ✓ - Average Chlorophyll Index # 6. Weekly_ci_change ✓ - Week-over-week CI change # 7. Four_week_trend - [FUTURE] Trend over 4 weeks (requires historical mosaics) # 8. Last_harvest_or_planting_date - [DUMMY for now] Will be from harvest Excel + LSTM (script 31) # 9. Age_week ✓ - Weeks since planting # 10. Phase (age based) ✓ - Growth phase (Germination, Tillering, Grand Growth, Maturation) # 11. nmr_weeks_in_this_phase - [TODO] Weeks spent in current phase (track phase transitions) # 12. Germination_progress - [TODO] % pixels with CI >= threshold (default 2, for age < 4 months) # 13. Imminent_prob - [DUMMY for now] Harvest probability (will be from script 31 output) # 14. Status_trigger ✓ - Alerts (harvest_ready, stress, etc.) # 15. CI_range (min-max) - [TODO] Min and max CI values in field # 16. CI_Percentiles ✓ - 10th-90th percentile of CI (p10-p90 format) # 17. CV ✓ - Coefficient of variation (field uniformity) # 18. CV_Trend_Short_Term - [TODO] 2-week CV trend (current week CV - last week CV) # 19. CV_Trend_Long_Term - [FUTURE] 8-week CV slope (requires linear regression, historical mosaics) # 20. Cloud_pct_clear ✓ - % field visible (pixel coverage) # 21. Cloud_category ✓ - Cloud classification (Clear view / Partial coverage / No image available) # # IMPLEMENTATION PLAN (ordered by difficulty): # ============================================================================ # PHASE 1 - EASY (Current data only): # [✓] Remove Mean_CI_prev column # [✓] Add Field_name column (from field_boundaries_sf$field) # [✓] Add Farm_Section column (empty, user will fill) # [✓] Add Last_harvest_or_planting_date (use UNIFORM_PLANTING_DATE as dummy) # [✓] Add CI_range (min/max from pixel extraction) # [✓] Add Cloud_pct_clear (% from pixel coverage) # [✓] Column order: Reorder to match spec (1-21) # # # PHASE 2 - MEDIUM (Requires computation): # [ ] Add nmr_weeks_in_this_phase (track phase transitions with previous week CSV) # [ ] Add Germination_progress (% pixels CI >= GERMINATION_CI_THRESHOLD, configurable) # [ ] Add Imminent_prob column (dummy NA, will merge from script 31 harvest_imminent_weekly.csv) # [ ] Add CV_Trend_Short_Term (requires loading last week's CV values) # # PHASE 3 - COMPLEX (Requires historical data): # [ ] Add Four_week_trend (CI value difference week vs 4 weeks ago, requires loading prev mosaics) # [ ] Add CV_Trend_Long_Term (8-week slope: linear regression on 8 weeks of CV, suggests lm()) # [ ] Load previous week's CSV to cross-check phase transitions and trends # # NOTES: # - Script 31 (harvest_imminent_weekly.py) outputs: field, imminent_prob, detected_prob, week, year # - Will need to LEFT JOIN on (field, week, year) to populate Imminent_prob # - Phase transition logic: Compare this week's Phase vs last week's Phase from CSV # - For 8-week CV slope: Linear regression slope = (CV_week8 - CV_week1) / 7 weeks (approximately) # or use lm(CV ~ week) on 8-week sequence for proper slope calculation # - Germination_progress only calculated if Age_week < 17 (before end of Tillering phase) # - Cloud_pct_clear calculated as: (pixel_count / expected_pixels) * 100 # ============================================================================ # *** CONFIGURATION SECTION - MANUALLY DEFINED THRESHOLDS *** # ============================================================================ # TEST MODE (for development with limited historical data) TEST_MODE <- TRUE TEST_MODE_NUM_WEEKS <- 2 # GERMINATION PROGRESS THRESHOLD # Percentage of pixels that must reach this CI value to count as "germinated" GERMINATION_CI_THRESHOLD <- 2.0 # Pixels with CI >= 2 count as germinated # FOR TESTING: Set these fields as "recently planted" to demonstrate germination progress YOUNG_FIELDS_FOR_TESTING <- c("1", "2", "3", "4", "5", "6", "7", "8", "9", "10") # First 10 field IDs YOUNG_FIELD_PLANTING_DATE <- as.Date("2026-01-01") # Recently planted for demo # FOUR-WEEK TREND THRESHOLDS FOUR_WEEK_TREND_STRONG_GROWTH_MIN <- 0.5 FOUR_WEEK_TREND_GROWTH_MIN <- 0.1 FOUR_WEEK_TREND_GROWTH_MAX <- 0.5 FOUR_WEEK_TREND_NO_GROWTH_RANGE <- 0.1 FOUR_WEEK_TREND_DECLINE_MAX <- -0.1 FOUR_WEEK_TREND_DECLINE_MIN <- -0.5 FOUR_WEEK_TREND_STRONG_DECLINE_MAX <- -0.5 # CV TREND THRESHOLDS CV_TREND_THRESHOLD_SIGNIFICANT <- 0.05 # CLOUD COVER ROUNDING INTERVALS CLOUD_INTERVALS <- c(0, 50, 60, 70, 80, 90, 100) # PERCENTILE CALCULATIONS CI_PERCENTILE_LOW <- 0.10 CI_PERCENTILE_HIGH <- 0.90 # HISTORICAL DATA LOOKBACK WEEKS_FOR_FOUR_WEEK_TREND <- 4 WEEKS_FOR_CV_TREND_SHORT <- 2 WEEKS_FOR_CV_TREND_LONG <- 8 # ============================================================================ # 1. Load required libraries # ============================================================================ suppressPackageStartupMessages({ library(here) library(sf) library(terra) library(dplyr) library(tidyr) library(lubridate) library(readr) library(readxl) library(writexl) library(purrr) library(furrr) library(future) library(caret) library(CAST) library(randomForest) tryCatch({ library(torch) }, error = function(e) { message("Note: torch package not available - harvest model inference will be skipped") }) }) # ============================================================================ # PHASE AND STATUS TRIGGER DEFINITIONS # ============================================================================ PHASE_DEFINITIONS <- data.frame( phase = c("Germination", "Tillering", "Grand Growth", "Maturation"), age_start = c(0, 4, 17, 39), age_end = c(6, 16, 39, 200), stringsAsFactors = FALSE ) STATUS_TRIGGERS <- data.frame( trigger = c( "germination_started", "germination_complete", "stress_detected_whole_field", "strong_recovery", "growth_on_track", "maturation_progressing", "harvest_ready" ), age_min = c(0, 0, NA, NA, 4, 39, 45), age_max = c(6, 6, NA, NA, 39, 200, 200), description = c( "10% of field CI > 2", "70% of field CI >= 2", "CI decline > -1.5 + low CV", "CI increase > +1.5", "CI increasing consistently", "High CI, stable/declining", "Age 45+ weeks (ready to harvest)" ), stringsAsFactors = FALSE ) # ============================================================================ # TILE-AWARE HELPER FUNCTIONS # ============================================================================ get_tile_ids_for_field <- function(field_geom, tile_grid, field_id = NULL) { if (inherits(field_geom, "sf")) { field_bbox <- sf::st_bbox(field_geom) field_xmin <- field_bbox["xmin"] field_xmax <- field_bbox["xmax"] field_ymin <- field_bbox["ymin"] field_ymax <- field_bbox["ymax"] } else if (inherits(field_geom, "SpatVector")) { field_bbox <- terra::ext(field_geom) field_xmin <- field_bbox$xmin field_xmax <- field_bbox$xmax field_ymin <- field_bbox$ymin field_ymax <- field_bbox$ymax } else { stop("field_geom must be sf or terra::vect object") } # DEBUG: Print bbox info for first field if (!is.null(field_id) && field_id == "1391") { message(paste("[DEBUG get_tile_ids] Field bbox - xmin:", field_xmin, "xmax:", field_xmax, "ymin:", field_ymin, "ymax:", field_ymax)) message(paste("[DEBUG get_tile_ids] tile_grid sample: id=", tile_grid$id[1], "xmin=", tile_grid$xmin[1], "xmax=", tile_grid$xmax[1], "ymin=", tile_grid$ymin[1], "ymax=", tile_grid$ymax[1])) message(paste("[DEBUG get_tile_ids] tile_grid CRS:", sf::st_crs(tile_grid))) message(paste("[DEBUG get_tile_ids] field CRS:", sf::st_crs(field_geom))) } intersecting_tiles <- tile_grid$id[ !(tile_grid$xmax < field_xmin | tile_grid$xmin > field_xmax | tile_grid$ymax < field_ymin | tile_grid$ymin > field_ymax) ] return(as.numeric(intersecting_tiles)) } load_tiles_for_field <- function(field_geom, tile_ids, week_num, year, mosaic_dir) { if (length(tile_ids) == 0) { return(NULL) } tiles_list <- list() for (tile_id in sort(tile_ids)) { tile_filename <- sprintf("week_%02d_%d_%02d.tif", week_num, year, tile_id) tile_path <- file.path(mosaic_dir, tile_filename) if (file.exists(tile_path)) { tryCatch({ tile_rast <- terra::rast(tile_path) ci_band <- terra::subset(tile_rast, 5) tiles_list[[length(tiles_list) + 1]] <- ci_band }, error = function(e) { message(paste(" Warning: Could not load tile", tile_id, ":", e$message)) }) } } if (length(tiles_list) == 0) { return(NULL) } if (length(tiles_list) == 1) { return(tiles_list[[1]]) } else { tryCatch({ rsrc <- terra::sprc(tiles_list) merged <- terra::mosaic(rsrc, fun = "max") return(merged) }, error = function(e) { message(paste(" Warning: Could not merge tiles:", e$message)) return(tiles_list[[1]]) }) } } build_tile_grid <- function(mosaic_dir, week_num, year) { # Handle grid-size subdirectories (e.g., weekly_tile_max/5x5/) # First check if mosaic_dir contains grid-size subdirectories detected_grid_size <- NA if (dir.exists(mosaic_dir)) { subfolders <- list.dirs(mosaic_dir, full.names = FALSE, recursive = FALSE) grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE) if (length(grid_patterns) > 0) { # Use the first grid-size subdirectory found detected_grid_size <- grid_patterns[1] mosaic_dir <- file.path(mosaic_dir, detected_grid_size) message(paste(" Using grid-size subdirectory:", detected_grid_size)) } } tile_pattern <- sprintf("week_%02d_%d_([0-9]{2})\\.tif", week_num, year) tile_files <- list.files(mosaic_dir, pattern = tile_pattern, full.names = TRUE) if (length(tile_files) == 0) { stop(paste("No tile files found for week", week_num, year, "in", mosaic_dir)) } tile_grid <- data.frame( id = integer(), xmin = numeric(), xmax = numeric(), ymin = numeric(), ymax = numeric(), stringsAsFactors = FALSE ) for (tile_file in tile_files) { tryCatch({ matches <- regmatches(basename(tile_file), regexpr("_([0-9]{2})\\.tif$", basename(tile_file))) if (length(matches) > 0) { tile_id <- as.integer(sub("_|\\.tif", "", matches[1])) tile_rast <- terra::rast(tile_file) tile_ext <- terra::ext(tile_rast) tile_grid <- rbind(tile_grid, data.frame( id = tile_id, xmin = tile_ext$xmin, xmax = tile_ext$xmax, ymin = tile_ext$ymin, ymax = tile_ext$ymax, stringsAsFactors = FALSE )) } }, error = function(e) { message(paste(" Warning: Could not process tile", basename(tile_file), ":", e$message)) }) } if (nrow(tile_grid) == 0) { stop("Could not extract extents from any tile files") } # RETURN BOTH the grid AND the corrected mosaic directory path return(list( tile_grid = tile_grid, mosaic_dir = mosaic_dir, grid_size = detected_grid_size )) } # ============================================================================ # SC-64 ENHANCEMENT FUNCTIONS # ============================================================================ categorize_four_week_trend <- function(ci_values_list) { if (is.null(ci_values_list) || length(ci_values_list) < 2) { return(NA_character_) } ci_values_list <- ci_values_list[!is.na(ci_values_list)] if (length(ci_values_list) < 2) { return(NA_character_) } weekly_changes <- diff(ci_values_list) avg_weekly_change <- mean(weekly_changes, na.rm = TRUE) if (avg_weekly_change >= FOUR_WEEK_TREND_STRONG_GROWTH_MIN) { return("strong growth") } else if (avg_weekly_change >= FOUR_WEEK_TREND_GROWTH_MIN && avg_weekly_change < FOUR_WEEK_TREND_GROWTH_MAX) { return("growth") } else if (abs(avg_weekly_change) <= FOUR_WEEK_TREND_NO_GROWTH_RANGE) { return("no growth") } else if (avg_weekly_change <= FOUR_WEEK_TREND_DECLINE_MIN && avg_weekly_change > FOUR_WEEK_TREND_STRONG_DECLINE_MAX) { return("decline") } else if (avg_weekly_change < FOUR_WEEK_TREND_STRONG_DECLINE_MAX) { return("strong decline") } else { return("no growth") } } round_cloud_to_intervals <- function(cloud_pct_clear) { if (is.na(cloud_pct_clear)) { return(NA_character_) } if (cloud_pct_clear < 50) return("<50%") if (cloud_pct_clear < 60) return("50-60%") if (cloud_pct_clear < 70) return("60-70%") if (cloud_pct_clear < 80) return("70-80%") if (cloud_pct_clear < 90) return("80-90%") return(">90%") } get_ci_percentiles <- function(ci_values) { if (is.null(ci_values) || length(ci_values) == 0) { return(NA_character_) } ci_values <- ci_values[!is.na(ci_values)] if (length(ci_values) == 0) { return(NA_character_) } p10 <- quantile(ci_values, CI_PERCENTILE_LOW, na.rm = TRUE) p90 <- quantile(ci_values, CI_PERCENTILE_HIGH, na.rm = TRUE) return(sprintf("%.1f-%.1f", p10, p90)) } calculate_cv_trend <- function(cv_current, cv_previous) { if (is.na(cv_current) || is.na(cv_previous)) { return(NA_real_) } return(round(cv_current - cv_previous, 4)) } # ============================================================================ # HELPER FUNCTIONS # ============================================================================ get_phase_by_age <- function(age_weeks) { if (is.na(age_weeks)) return(NA_character_) for (i in seq_len(nrow(PHASE_DEFINITIONS))) { if (age_weeks >= PHASE_DEFINITIONS$age_start[i] && age_weeks <= PHASE_DEFINITIONS$age_end[i]) { return(PHASE_DEFINITIONS$phase[i]) } } return("Unknown") } get_status_trigger <- function(ci_values, ci_change, age_weeks) { if (is.na(age_weeks) || length(ci_values) == 0) return(NA_character_) ci_values <- ci_values[!is.na(ci_values)] if (length(ci_values) == 0) return(NA_character_) pct_above_2 <- sum(ci_values > 2) / length(ci_values) * 100 pct_at_or_above_2 <- sum(ci_values >= 2) / length(ci_values) * 100 ci_cv <- if (mean(ci_values, na.rm = TRUE) > 0) sd(ci_values) / mean(ci_values, na.rm = TRUE) else 0 mean_ci <- mean(ci_values, na.rm = TRUE) if (age_weeks >= 0 && age_weeks <= 6) { if (pct_at_or_above_2 >= 70) { return("germination_complete") } else if (pct_above_2 > 10) { return("germination_started") } } if (age_weeks >= 45) { return("harvest_ready") } if (age_weeks > 6 && !is.na(ci_change) && ci_change < -1.5 && ci_cv < 0.25) { return("stress_detected_whole_field") } if (age_weeks > 6 && !is.na(ci_change) && ci_change > 1.5) { return("strong_recovery") } if (age_weeks >= 4 && age_weeks < 39 && !is.na(ci_change) && ci_change > 0.2) { return("growth_on_track") } if (age_weeks >= 39 && age_weeks < 45 && mean_ci > 3.5) { return("maturation_progressing") } return(NA_character_) } load_historical_field_data <- function(project_dir, current_week, reports_dir, num_weeks = 4) { historical_data <- list() loaded_weeks <- c() for (lookback in 0:(num_weeks - 1)) { target_week <- current_week - lookback if (target_week < 1) target_week <- target_week + 52 csv_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", target_week), ".csv") csv_path <- file.path(reports_dir, "kpis", "field_analysis", csv_filename) if (file.exists(csv_path)) { tryCatch({ data <- read_csv(csv_path, show_col_types = FALSE) historical_data[[lookback + 1]] <- list( week = target_week, data = data ) loaded_weeks <- c(loaded_weeks, target_week) }, error = function(e) { message(paste(" Warning: Could not load week", target_week, ":", e$message)) }) } } if (length(historical_data) == 0) { message(paste("Warning: No historical field data found for trend calculations")) return(NULL) } message(paste("Loaded", length(historical_data), "weeks of historical data:", paste(loaded_weeks, collapse = ", "))) return(historical_data) } USE_UNIFORM_AGE <- TRUE UNIFORM_PLANTING_DATE <- as.Date("2025-01-01") extract_planting_dates <- function(harvesting_data, field_boundaries_sf = NULL) { if (USE_UNIFORM_AGE) { message(paste("Using uniform planting date for all fields:", UNIFORM_PLANTING_DATE)) # Return a data frame with all field IDs mapped to uniform planting date if (!is.null(field_boundaries_sf)) { return(data.frame( field_id = field_boundaries_sf$field, date = rep(UNIFORM_PLANTING_DATE, nrow(field_boundaries_sf)), stringsAsFactors = FALSE )) } else { # Fallback if field_boundaries_sf not provided return(NULL) } } if (is.null(harvesting_data) || nrow(harvesting_data) == 0) { message("Warning: No harvesting data available.") return(NULL) } tryCatch({ planting_dates <- harvesting_data %>% arrange(field, desc(season_start)) %>% distinct(field, .keep_all = TRUE) %>% select(field, season_start) %>% rename(field_id = field, planting_date = season_start) %>% filter(!is.na(planting_date)) %>% as.data.frame() message(paste("Extracted planting dates for", nrow(planting_dates), "fields")) return(planting_dates) }, error = function(e) { message(paste("Error extracting planting dates:", e$message)) return(NULL) }) } # ============================================================================ # MODULAR STATISTICS CALCULATION (Reusable for any week) # ============================================================================ calculate_field_statistics <- function(field_boundaries_sf, week_num, year, mosaic_dir, report_date = Sys.Date()) { message(paste("Calculating statistics for all fields - Week", week_num, year)) # Debug: Check if constants are available message(paste(" DEBUG: YOUNG_FIELDS_FOR_TESTING =", paste(YOUNG_FIELDS_FOR_TESTING, collapse=", "))) message(paste(" DEBUG: YOUNG_FIELD_PLANTING_DATE =", YOUNG_FIELD_PLANTING_DATE)) # Build tile file list tile_pattern <- sprintf("week_%02d_%d_([0-9]{2})\\.tif", week_num, year) tile_files <- list.files(mosaic_dir, pattern = tile_pattern, full.names = TRUE) if (length(tile_files) == 0) { stop(paste("No tile files found for week", week_num, year, "in", mosaic_dir)) } message(paste(" Found", length(tile_files), "tiles for week", week_num)) results_list <- list() fields_processed <- 0 # SCRIPT 20 APPROACH: Loop through tiles, extract all fields from each tile for (tile_idx in seq_along(tile_files)) { tile_file <- tile_files[tile_idx] tryCatch({ # Load tile current_rast <- terra::rast(tile_file) ci_band <- current_rast[["CI"]] if (is.null(ci_band) || !inherits(ci_band, "SpatRaster")) { message(paste(" [SKIP] Tile", basename(tile_file), "- CI band not found")) return(NULL) } # Extract all fields from this tile in ONE call # terra::extract returns a dataframe with columns: ID, CI # where each row is one pixel, and ID indicates which polygon it came from extracted <- terra::extract(ci_band, field_boundaries_sf, na.rm = FALSE) # Group by field ID and calculate statistics for each field # extracted$ID contains the field polygon index (1 to nrow(field_boundaries_sf)) unique_field_ids <- unique(extracted$ID[!is.na(extracted$ID)]) for (field_poly_idx in unique_field_ids) { # Get all CI values for this field from this tile field_id <- field_boundaries_sf$field[field_poly_idx] ci_vals <- extracted$CI[extracted$ID == field_poly_idx] ci_vals <- ci_vals[!is.na(ci_vals)] # Skip if no data for this field in this tile if (length(ci_vals) == 0) { next } # Calculate statistics mean_ci <- mean(ci_vals, na.rm = TRUE) ci_std <- sd(ci_vals, na.rm = TRUE) cv <- if (mean_ci > 0) ci_std / mean_ci else NA_real_ range_min <- min(ci_vals, na.rm = TRUE) range_max <- max(ci_vals, na.rm = TRUE) range_str <- sprintf("%.1f-%.1f", range_min, range_max) ci_percentiles_str <- get_ci_percentiles(ci_vals) # Cloud coverage: count total pixels vs non-NA pixels for this field field_rows <- extracted[extracted$ID == field_poly_idx, ] num_total <- nrow(field_rows) num_data <- sum(!is.na(field_rows$CI)) pct_clear <- if (num_total > 0) round((num_data / num_total) * 100, 1) else 0 cloud_cat <- if (num_data == 0) "No image available" else if (pct_clear >= 99.5) "Clear view" else "Partial coverage" # Age and Phase age_weeks <- NA_real_ if (USE_UNIFORM_AGE) { # Check if this field is in the "young fields" list (for testing germination progress) is_young_field <- field_id %in% YOUNG_FIELDS_FOR_TESTING if (is_young_field) { age_weeks <- as.numeric(difftime(report_date, YOUNG_FIELD_PLANTING_DATE, units = "weeks")) # Debug for first 2 matches if (field_id %in% c("1", "2")) { message(paste(" DEBUG: Field", field_id, "is young field, age =", round(age_weeks, 2), "weeks")) } } else { age_weeks <- as.numeric(difftime(report_date, UNIFORM_PLANTING_DATE, units = "weeks")) } } phase <- get_phase_by_age(age_weeks) # Germination progress (only for young plants, age < 17 weeks) germination_progress <- NA_character_ if (!is.na(age_weeks) && age_weeks >= 0 && age_weeks < 17) { pct_ci_ge_threshold <- sum(ci_vals >= GERMINATION_CI_THRESHOLD) / length(ci_vals) * 100 germination_progress <- sprintf("%.1f%%", pct_ci_ge_threshold) } # Store result (check if field already exists from another tile) existing_idx <- which(sapply(results_list, function(x) x$Field_id) == field_id) if (length(existing_idx) > 0) { # Field already in results from previous tile - keep first occurrence or average # For now, keep the first one (earlier tiles) next } # Store new field result results_list[[length(results_list) + 1]] <- data.frame( Field_id = field_id, Mean_CI = round(mean_ci, 2), CV = round(cv, 4), CI_range = range_str, CI_Percentiles = ci_percentiles_str, Cloud_pct_clear = pct_clear, Cloud_category = cloud_cat, Age_week = round(age_weeks, 1), Phase = phase, Germination_progress = germination_progress, stringsAsFactors = FALSE ) fields_processed <- fields_processed + 1 } message(paste(" Tile", tile_idx, "of", length(tile_files), "processed")) }, error = function(e) { message(paste(" [ERROR] Tile", basename(tile_file), ":", e$message)) }) } if (length(results_list) == 0) { stop(paste("No fields processed successfully for week", week_num)) } stats_df <- dplyr::bind_rows(results_list) message(paste(" ✓ Successfully calculated statistics for", nrow(stats_df), "fields")) return(stats_df) } # ============================================================================ # CALCULATE KPI TRENDS (Requires previous week RDS) # ============================================================================ calculate_kpi_trends <- function(current_stats, prev_stats = NULL) { message("Calculating KPI trends from current and previous week data") # Initialize new columns with defaults current_stats$Weekly_ci_change <- NA_real_ current_stats$CV_Trend_Short_Term <- NA_real_ current_stats$nmr_weeks_in_this_phase <- 1L # If no previous week data, return with defaults if (is.null(prev_stats) || nrow(prev_stats) == 0) { message(" No previous week data available - using defaults") return(current_stats) } message(paste(" prev_stats has", nrow(prev_stats), "rows and", ncol(prev_stats), "columns")) message(paste(" prev_stats columns:", paste(names(prev_stats), collapse = ", "))) # Build lookup indices for previous week (by Field_id) prev_lookup <- setNames(seq_len(nrow(prev_stats)), prev_stats$Field_id) # Try to load previous week's field_analysis to get nmr_weeks_in_this_phase history prev_field_analysis <- NULL prev_analysis_csv <- file.path( reports_dir, "kpis", "field_analysis", sprintf("%s_field_analysis_week%02d.csv", paste(strsplit(current_stats$Field_id[1], "")[[1]][1], collapse=""), # Get project from field as.numeric(format(Sys.Date() - 7, "%V"))) # Approximate previous week ) # Better way: construct the previous week number properly current_week_num <- as.numeric(format(Sys.Date(), "%V")) prev_week_num <- current_week_num - 1 if (prev_week_num < 1) prev_week_num <- 52 # This is a bit tricky - we need the project_dir from the main scope # For now, assume we can infer it or pass it through # Let's use a simpler approach: look for any field_analysis_week* file that's recent tryCatch({ analysis_dir <- file.path(reports_dir, "kpis", "field_analysis") if (dir.exists(analysis_dir)) { # Find the most recent field_analysis CSV (should be previous week) analysis_files <- list.files(analysis_dir, pattern = "_field_analysis_week.*\\.csv$", full.names = TRUE) if (length(analysis_files) > 0) { # Sort by modification time and get the most recent recent_file <- analysis_files[which.max(file.info(analysis_files)$mtime)] prev_field_analysis <- readr::read_csv(recent_file, show_col_types = FALSE, col_select = c(Field_id, nmr_weeks_in_this_phase, Phase)) } } }, error = function(e) { message(paste(" Note: Could not load previous field_analysis for nmr_weeks tracking:", e$message)) }) if (!is.null(prev_field_analysis) && nrow(prev_field_analysis) > 0) { message(paste(" Using previous field_analysis to track nmr_weeks_in_this_phase")) } # For each field in current week, lookup previous values cv_trends_calculated <- 0 for (i in seq_len(nrow(current_stats))) { field_id <- current_stats$Field_id[i] prev_idx <- prev_lookup[field_id] if (!is.na(prev_idx) && prev_idx > 0 && prev_idx <= nrow(prev_stats)) { # Field exists in previous week - extract row carefully prev_row <- prev_stats[prev_idx, , drop = FALSE] # Keep as dataframe if (nrow(prev_row) == 0) { # Field not found - skip next } # Access values from single-row dataframe prev_mean_ci <- prev_row$Mean_CI[1] prev_cv <- prev_row$CV[1] prev_phase <- prev_row$Phase[1] # Weekly CI change (current Mean_CI - previous Mean_CI) if (!is.na(prev_mean_ci) && !is.na(current_stats$Mean_CI[i])) { current_stats$Weekly_ci_change[i] <- round(current_stats$Mean_CI[i] - prev_mean_ci, 2) } # CV short-term trend (current CV - previous CV) # DEBUG: Check first few fields if (i <= 3) { message(paste(" Field", field_id, "- CV_prev:", prev_cv, "CV_curr:", current_stats$CV[i])) } if (!is.na(prev_cv) && !is.na(current_stats$CV[i])) { trend_val <- round(current_stats$CV[i] - prev_cv, 4) current_stats$CV_Trend_Short_Term[i] <- trend_val cv_trends_calculated <- cv_trends_calculated + 1 if (i <= 3) { message(paste(" -> CV_Trend =", trend_val)) } } # Weeks in current phase (track phase transitions) # Use previous field_analysis if available for proper counter progression if (!is.null(prev_field_analysis) && nrow(prev_field_analysis) > 0) { # Look up this field in previous analysis prev_analysis_row <- prev_field_analysis %>% dplyr::filter(Field_id == field_id) if (nrow(prev_analysis_row) > 0) { prev_phase_analysis <- prev_analysis_row$Phase[1] prev_nmr_weeks_analysis <- prev_analysis_row$nmr_weeks_in_this_phase[1] if (!is.na(current_stats$Phase[i]) && !is.na(prev_phase_analysis)) { if (current_stats$Phase[i] == prev_phase_analysis) { # Same phase - increment the counter current_stats$nmr_weeks_in_this_phase[i] <- if (!is.na(prev_nmr_weeks_analysis)) prev_nmr_weeks_analysis + 1L else 2L } else { # Phase changed - reset to 1 current_stats$nmr_weeks_in_this_phase[i] <- 1L } } } else if (!is.na(current_stats$Phase[i]) && !is.na(prev_phase)) { # Field not in previous analysis, fall back to prev_stats phase comparison if (current_stats$Phase[i] == prev_phase) { current_stats$nmr_weeks_in_this_phase[i] <- 2L } else { current_stats$nmr_weeks_in_this_phase[i] <- 1L } } } else { # No previous field_analysis available - use phase from prev_stats if (!is.na(current_stats$Phase[i]) && !is.na(prev_phase)) { if (current_stats$Phase[i] == prev_phase) { # Same phase - increment counter (start with 2) current_stats$nmr_weeks_in_this_phase[i] <- 2L } else { # Phase changed - reset to 1 current_stats$nmr_weeks_in_this_phase[i] <- 1L } } } } } message(paste(" ✓ Calculated", cv_trends_calculated, "CV_Trend_Short_Term values out of", nrow(current_stats), "fields")) message(paste(" CV_Trend_Short_Term non-NA values:", sum(!is.na(current_stats$CV_Trend_Short_Term)))) return(current_stats) } # ============================================================================ # LOAD OR CALCULATE WEEKLY STATISTICS (RDS Caching) # ============================================================================ load_or_calculate_weekly_stats <- function(week_num, year, project_dir, field_boundaries_sf, mosaic_dir, reports_dir, report_date = Sys.Date()) { # Build RDS file path rds_filename <- sprintf("%s_field_stats_week%02d.rds", project_dir, week_num) rds_path <- file.path(reports_dir, "kpis", "field_stats", rds_filename) # Try to load existing RDS (fast cache) if (file.exists(rds_path)) { message(paste("Loading cached statistics from:", basename(rds_path))) return(readRDS(rds_path)) } # RDS not found - calculate from tiles message(paste("Cached RDS not found, calculating statistics from tiles for week", week_num)) stats_df <- calculate_field_statistics(field_boundaries_sf, week_num, year, mosaic_dir, report_date) # Create output directory if needed output_dir <- file.path(reports_dir, "kpis", "field_stats") if (!dir.exists(output_dir)) { dir.create(output_dir, recursive = TRUE, showWarnings = FALSE) } # Export RDS (for fast lookup next week) saveRDS(stats_df, rds_path) message(paste("Saved weekly statistics RDS:", basename(rds_path))) # Export CSV (for user review) csv_filename <- sprintf("%s_field_stats_week%02d.csv", project_dir, week_num) csv_path <- file.path(output_dir, csv_filename) readr::write_csv(stats_df, csv_path) message(paste("Saved weekly statistics CSV:", basename(csv_path))) return(stats_df) } # ============================================================================ # PARALLEL FIELD ANALYSIS FUNCTION # ============================================================================ analyze_single_field <- function(field_idx, field_boundaries_sf, tile_grid, week_num, year, mosaic_dir, historical_data = NULL, planting_dates = NULL, report_date = Sys.Date(), harvest_imminence_data = NULL, harvesting_data = NULL) { tryCatch({ field_id <- field_boundaries_sf$field[field_idx] farm_section <- if ("sub_area" %in% names(field_boundaries_sf)) { field_boundaries_sf$sub_area[field_idx] } else { NA_character_ } field_name <- field_id # DEBUG: Print for first few fields if (field_idx <= 3) { message(paste("[DEBUG] Field", field_idx, ":", field_id)) } field_sf <- field_boundaries_sf[field_idx, ] if (sf::st_is_empty(field_sf) || any(is.na(sf::st_geometry(field_sf)))) { return(data.frame( Field_id = field_id, error = "Empty or invalid geometry" )) } field_area_ha <- as.numeric(sf::st_area(field_sf)) / 10000 field_area_acres <- field_area_ha / 0.404686 tile_ids <- get_tile_ids_for_field(field_sf, tile_grid, field_id = field_id) # DEBUG: Print tile IDs for first field if (field_idx == 1) { message(paste("[DEBUG] First field tile_ids:", paste(tile_ids, collapse=","))) message(paste("[DEBUG] tile_grid nrows:", nrow(tile_grid), "ncols:", ncol(tile_grid))) message(paste("[DEBUG] mosaic_dir:", mosaic_dir)) } current_ci <- load_tiles_for_field(field_sf, tile_ids, week_num, year, mosaic_dir) if (is.null(current_ci)) { return(data.frame( Field_id = field_id, error = "No tile data available" )) } # SINGLE EXTRACTION: Get all pixel values for this field, then calculate all stats from it field_bbox <- sf::st_bbox(field_sf) ci_cropped <- terra::crop(current_ci, terra::ext(field_bbox), snap = "out") # Extract all pixels in one call (no fun= parameter means we get raw pixel values) all_extracted <- terra::extract(ci_cropped, field_sf)[, 2] current_ci_vals <- all_extracted[!is.na(all_extracted)] if (length(current_ci_vals) == 0) { return(data.frame( Field_id = field_id, error = "No CI values extracted from tiles" )) } # Calculate all statistics from the single extraction mean_ci_current <- mean(current_ci_vals, na.rm = TRUE) ci_std <- sd(current_ci_vals, na.rm = TRUE) cv_current <- if (mean_ci_current > 0) ci_std / mean_ci_current else NA_real_ range_min <- min(current_ci_vals, na.rm = TRUE) range_max <- max(current_ci_vals, na.rm = TRUE) range_str <- sprintf("%.1f-%.1f", range_min, range_max) ci_percentiles_str <- get_ci_percentiles(current_ci_vals) # Cloud coverage from extraction metadata num_total <- length(all_extracted) num_data <- length(current_ci_vals) pct_clear <- if (num_total > 0) round((num_data / num_total) * 100, 1) else 0 cloud_cat <- if (num_data == 0) "No image available" else if (pct_clear >= 99.5) "Clear view" else "Partial coverage" cloud_pct <- 100 - pct_clear cloud_interval <- round_cloud_to_intervals(pct_clear) # Weekly change (extract previous week same way - single extraction) weekly_ci_change <- NA previous_ci_vals <- NULL tryCatch({ previous_ci <- load_tiles_for_field(field_sf, tile_ids, week_num - 1, year, mosaic_dir) if (!is.null(previous_ci)) { prev_bbox <- sf::st_bbox(field_sf) prev_ci_cropped <- terra::crop(previous_ci, terra::ext(prev_bbox), snap = "out") prev_extracted_all <- terra::extract(prev_ci_cropped, field_sf)[, 2] previous_ci_vals <- prev_extracted_all[!is.na(prev_extracted_all)] if (length(previous_ci_vals) > 0) { mean_ci_previous <- mean(previous_ci_vals, na.rm = TRUE) weekly_ci_change <- mean_ci_current - mean_ci_previous } } }, error = function(e) { # Silent fail }) if (is.na(weekly_ci_change)) { weekly_ci_change_str <- sprintf("%.1f ± %.2f", mean_ci_current, ci_std) } else { weekly_ci_change_str <- sprintf("%.1f ± %.2f (Δ%.1f)", mean_ci_current, ci_std, weekly_ci_change) } age_weeks <- NA if (!is.null(planting_dates) && nrow(planting_dates) > 0) { field_planting <- planting_dates %>% filter(field_id == !!field_id) %>% pull(planting_date) if (length(field_planting) > 0) { age_weeks <- as.numeric(difftime(report_date, field_planting[1], units = "weeks")) } } if (USE_UNIFORM_AGE) { age_weeks <- as.numeric(difftime(report_date, UNIFORM_PLANTING_DATE, units = "weeks")) } pct_ci_above_2 <- sum(current_ci_vals > 2) / length(current_ci_vals) * 100 pct_ci_ge_2 <- sum(current_ci_vals >= 2) / length(current_ci_vals) * 100 germination_progress_str <- NA_character_ if (!is.na(age_weeks) && age_weeks >= 0 && age_weeks <= 6) { germination_progress_str <- sprintf("%.0f%%", pct_ci_ge_2) } phase <- "Unknown" imminent_prob_val <- NA if (!is.null(harvest_imminence_data) && nrow(harvest_imminence_data) > 0) { imminence_row <- harvest_imminence_data %>% filter(field_id == !!field_id) if (nrow(imminence_row) > 0) { imminent_prob_val <- imminence_row$probability[1] if (imminent_prob_val > 0.5) { phase <- "Harvest Imminent (Model)" } } } if (phase == "Unknown") { phase <- get_phase_by_age(age_weeks) } status_trigger <- get_status_trigger(current_ci_vals, weekly_ci_change, age_weeks) nmr_weeks_in_phase <- 1 four_week_trend <- NA_character_ ci_values_for_trend <- c(mean_ci_current) if (!is.null(historical_data) && length(historical_data) > 0) { for (hist in historical_data) { hist_week <- hist$week hist_data <- hist$data field_row <- hist_data %>% filter(Field_id == !!field_id) if (nrow(field_row) > 0 && !is.na(field_row$Mean_CI[1])) { ci_values_for_trend <- c(field_row$Mean_CI[1], ci_values_for_trend) } } if (length(ci_values_for_trend) >= 2) { four_week_trend <- categorize_four_week_trend(ci_values_for_trend) } } cv_trend_short <- NA_real_ cv_trend_long <- NA_real_ if (!is.null(historical_data) && length(historical_data) > 0) { if (length(historical_data) >= 2) { cv_2w <- historical_data[[2]]$data %>% filter(Field_id == !!field_id) %>% pull(CV) if (length(cv_2w) > 0 && !is.na(cv_2w[1])) { cv_trend_short <- calculate_cv_trend(cv_current, cv_2w[1]) } } if (length(historical_data) >= 8) { cv_8w <- historical_data[[8]]$data %>% filter(Field_id == !!field_id) %>% pull(CV) if (length(cv_8w) > 0 && !is.na(cv_8w[1])) { cv_trend_long <- calculate_cv_trend(cv_current, cv_8w[1]) } } } last_harvest_date <- NA_character_ if (!is.null(harvesting_data) && nrow(harvesting_data) > 0) { last_harvest_row <- harvesting_data %>% filter(field == !!field_id) %>% arrange(desc(season_start)) %>% slice(1) if (nrow(last_harvest_row) > 0 && !is.na(last_harvest_row$season_start[1])) { last_harvest_date <- as.character(last_harvest_row$season_start[1]) } } result <- data.frame( Field_id = field_id, Farm_Section = farm_section, Field_name = field_name, Hectare = round(field_area_ha, 2), Acreage = round(field_area_acres, 2), Mean_CI = round(mean_ci_current, 2), Weekly_ci_change = if (is.na(weekly_ci_change)) NA_real_ else round(weekly_ci_change, 2), Weekly_ci_change_str = weekly_ci_change_str, Four_week_trend = four_week_trend, Last_harvest_or_planting_date = last_harvest_date, Age_week = if (is.na(age_weeks)) NA_integer_ else as.integer(round(age_weeks)), `Phase (age based)` = phase, nmr_weeks_in_this_phase = nmr_weeks_in_phase, Germination_progress = germination_progress_str, Imminent_prob = imminent_prob_val, Status_trigger = status_trigger, CI_range = range_str, CI_Percentiles = ci_percentiles_str, CV = round(cv_current, 4), CV_Trend_Short_Term = cv_trend_short, CV_Trend_Long_Term = cv_trend_long, Cloud_pct_clear = pct_clear, Cloud_pct_clear_interval = cloud_interval, Cloud_pct = cloud_pct, Cloud_category = cloud_cat, stringsAsFactors = FALSE ) return(result) }, error = function(e) { message(paste("Error analyzing field", field_idx, ":", e$message)) return(data.frame( Field_id = NA_character_, error = e$message )) }) } # ============================================================================ # SUMMARY GENERATION # ============================================================================ generate_field_analysis_summary <- function(field_df) { message("Generating summary statistics...") total_acreage <- sum(field_df$Acreage, na.rm = TRUE) germination_acreage <- sum(field_df$Acreage[field_df$Phase == "Germination"], na.rm = TRUE) tillering_acreage <- sum(field_df$Acreage[field_df$Phase == "Tillering"], na.rm = TRUE) grand_growth_acreage <- sum(field_df$Acreage[field_df$Phase == "Grand Growth"], na.rm = TRUE) maturation_acreage <- sum(field_df$Acreage[field_df$Phase == "Maturation"], na.rm = TRUE) unknown_phase_acreage <- sum(field_df$Acreage[field_df$Phase == "Unknown"], na.rm = TRUE) harvest_ready_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "harvest_ready"], na.rm = TRUE) stress_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "stress_detected_whole_field"], na.rm = TRUE) recovery_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "strong_recovery"], na.rm = TRUE) growth_on_track_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "growth_on_track"], na.rm = TRUE) germination_complete_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "germination_complete"], na.rm = TRUE) germination_started_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "germination_started"], na.rm = TRUE) no_trigger_acreage <- sum(field_df$Acreage[is.na(field_df$Status_trigger)], na.rm = TRUE) clear_fields <- sum(field_df$Cloud_category == "Clear view", na.rm = TRUE) partial_fields <- sum(field_df$Cloud_category == "Partial coverage", na.rm = TRUE) no_image_fields <- sum(field_df$Cloud_category == "No image available", na.rm = TRUE) total_fields <- nrow(field_df) clear_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "Clear view"], na.rm = TRUE) partial_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "Partial coverage"], na.rm = TRUE) no_image_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "No image available"], na.rm = TRUE) summary_df <- data.frame( Category = c( "--- PHASE DISTRIBUTION ---", "Germination", "Tillering", "Grand Growth", "Maturation", "Unknown phase", "--- STATUS TRIGGERS ---", "Harvest ready", "Stress detected", "Strong recovery", "Growth on track", "Germination complete", "Germination started", "No trigger", "--- CLOUD COVERAGE (FIELDS) ---", "Clear view", "Partial coverage", "No image available", "--- CLOUD COVERAGE (ACREAGE) ---", "Clear view", "Partial coverage", "No image available", "--- TOTAL ---", "Total Acreage" ), Acreage = c( NA, round(germination_acreage, 2), round(tillering_acreage, 2), round(grand_growth_acreage, 2), round(maturation_acreage, 2), round(unknown_phase_acreage, 2), NA, round(harvest_ready_acreage, 2), round(stress_acreage, 2), round(recovery_acreage, 2), round(growth_on_track_acreage, 2), round(germination_complete_acreage, 2), round(germination_started_acreage, 2), round(no_trigger_acreage, 2), NA, paste0(clear_fields, " fields"), paste0(partial_fields, " fields"), paste0(no_image_fields, " fields"), NA, round(clear_acreage, 2), round(partial_acreage, 2), round(no_image_acreage, 2), NA, round(total_acreage, 2) ), stringsAsFactors = FALSE ) return(summary_df) } # ============================================================================ # EXPORT FUNCTIONS # ============================================================================ export_field_analysis_excel <- function(field_df, summary_df, project_dir, current_week, reports_dir) { message("Exporting per-field analysis to Excel, CSV, and RDS...") # Round all numeric columns to 2 decimals field_df_rounded <- field_df %>% mutate(across(where(is.numeric), ~ round(., 2))) summary_df_rounded <- summary_df %>% mutate(across(where(is.numeric), ~ round(., 2))) output_subdir <- file.path(reports_dir, "kpis", "field_analysis") if (!dir.exists(output_subdir)) { dir.create(output_subdir, recursive = TRUE) } excel_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", current_week), ".xlsx") excel_path <- file.path(output_subdir, excel_filename) excel_path <- normalizePath(excel_path, winslash = "\\", mustWork = FALSE) sheets <- list( "Field Data" = field_df_rounded, "Summary" = summary_df_rounded ) write_xlsx(sheets, excel_path) message(paste("✓ Field analysis Excel exported to:", excel_path)) kpi_data <- list( field_analysis = field_df_rounded, field_analysis_summary = summary_df_rounded, metadata = list( current_week = current_week, project = project_dir, created_at = Sys.time() ) ) rds_filename <- paste0(project_dir, "_kpi_summary_tables_week", sprintf("%02d", current_week), ".rds") rds_path <- file.path(reports_dir, "kpis", rds_filename) saveRDS(kpi_data, rds_path) message(paste("✓ Field analysis RDS exported to:", rds_path)) csv_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", current_week), ".csv") csv_path <- file.path(output_subdir, csv_filename) write_csv(field_df_rounded, csv_path) message(paste("✓ Field analysis CSV exported to:", csv_path)) return(list(excel = excel_path, rds = rds_path, csv = csv_path)) } # ============================================================================ # TILE-BASED KPI EXTRACTION FUNCTION # ============================================================================ calculate_field_kpis_from_tiles <- function(tile_dir, week_num, year, field_boundaries_sf, tile_grid) { # Loop through tiles, extract KPI statistics per field per tile # Follows the same pattern as extract_ci_from_tiles in CI extraction message("Calculating field-level KPI statistics from tiles...") # Get all tile files for this week tile_pattern <- sprintf("week_%02d_%d_([0-9]{2})\\.tif", week_num, year) tile_files <- list.files(tile_dir, pattern = tile_pattern, full.names = TRUE) if (length(tile_files) == 0) { message("No tiles found for week", week_num, year) return(NULL) } # Process tiles in parallel using furrr (same as CI extraction) message(paste("Processing", length(tile_files), "tiles in parallel...")) field_kpi_list <- furrr::future_map( tile_files, ~ process_single_kpi_tile( tile_file = ., field_boundaries_sf = field_boundaries_sf, tile_grid = tile_grid ), .progress = TRUE, .options = furrr::furrr_options(seed = TRUE) ) # Combine results from all tiles field_kpi_stats <- dplyr::bind_rows(field_kpi_list) if (nrow(field_kpi_stats) == 0) { message(" No KPI data extracted from tiles") return(NULL) } message(paste(" Extracted KPI stats for", length(unique(field_kpi_stats$field)), "unique fields")) return(field_kpi_stats) } # Helper function to process a single tile (like process_single_tile in CI extraction) process_single_kpi_tile <- function(tile_file, field_boundaries_sf, tile_grid) { tryCatch({ tile_basename <- basename(tile_file) # Load tile raster tile_raster <- terra::rast(tile_file) # Get first band (CI band for weekly mosaics) ci_band <- tile_raster[[1]] # EXACTLY LIKE SCRIPT 20: Crop to field bounding box first, then extract with sf directly field_bbox <- sf::st_bbox(field_boundaries_sf) ci_cropped <- terra::crop(ci_band, terra::ext(field_bbox), snap = "out") # Extract CI values for ALL fields at once using sf object directly (NOT terra::vect) # terra::extract() works with sf objects and handles geometries properly extracted_vals <- terra::extract(ci_cropped, field_boundaries_sf, fun = "mean", na.rm = TRUE) # Initialize results for this tile tile_results <- data.frame() # Get tile ID from filename tile_id_match <- as.numeric(sub(".*_(\\d{2})\\.tif$", "\\1", tile_basename)) # Process each field: extracted_vals is a data.frame with ID column (field indices) + extracted values for (field_idx in seq_len(nrow(field_boundaries_sf))) { field_id <- field_boundaries_sf$field[field_idx] # extracted_vals columns: 1=ID, 2=mean_CI (since we used fun="mean") mean_ci <- extracted_vals[field_idx, 2] # Skip if no data for this field in this tile if (is.na(mean_ci)) { next } # For tile-level stats, we only have mean from extraction (no variance without all pixels) # Add to results tile_results <- rbind(tile_results, data.frame( field = field_id, tile_id = tile_id_match, tile_file = tile_basename, mean_ci = round(mean_ci, 4), stringsAsFactors = FALSE )) } return(tile_results) }, error = function(e) { message(paste(" Warning: Error processing tile", basename(tile_file), ":", e$message)) return(data.frame()) }) } calculate_and_export_farm_kpis <- function(report_date, project_dir, field_boundaries_sf, harvesting_data, cumulative_CI_vals_dir, weekly_CI_mosaic, reports_dir, current_week, year, tile_grid, use_tile_mosaic = FALSE, tile_grid_size = "5x5") { message("\n=== CALCULATING FARM-LEVEL KPIs ===") message("(6 high-level KPI metrics with tile-based extraction)") output_dir <- file.path(reports_dir, "kpis") if (!dir.exists(output_dir)) { dir.create(output_dir, recursive = TRUE) } # Get mosaic directory with grid size if using tiles mosaic_dir <- if (use_tile_mosaic && !is.null(tile_grid_size)) { file.path(weekly_CI_mosaic, tile_grid_size) } else { weekly_CI_mosaic } # Extract field-level KPI statistics from tiles field_kpi_stats <- calculate_field_kpis_from_tiles( tile_dir = mosaic_dir, week_num = current_week, year = year, field_boundaries_sf = field_boundaries_sf, tile_grid = tile_grid ) if (is.null(field_kpi_stats) || nrow(field_kpi_stats) == 0) { message("Warning: No field KPI statistics extracted from tiles") return(NULL) } # Aggregate tile-based statistics by field (average across tiles for each field) field_summary_stats <- field_kpi_stats %>% dplyr::group_by(field) %>% dplyr::summarise( mean_ci = mean(mean_ci, na.rm = TRUE), cv_ci = mean(cv_ci, na.rm = TRUE), min_ci = min(min_ci, na.rm = TRUE), max_ci = max(max_ci, na.rm = TRUE), total_pixels = sum(n_pixels, na.rm = TRUE), num_tiles = n_distinct(tile_id), .groups = 'drop' ) # Create results list kpi_results <- list( field_kpi_stats = field_kpi_stats, field_summary_stats = field_summary_stats, metadata = list( report_date = report_date, current_week = current_week, year = year, calculation_method = "tile_based_extraction", num_fields_processed = length(unique(field_kpi_stats$field)), num_tiles_processed = length(unique(field_kpi_stats$tile_id)) ) ) # Save results rds_filename <- paste0(project_dir, "_farm_kpi_stats_week", sprintf("%02d", current_week), ".rds") rds_path <- file.path(output_dir, rds_filename) saveRDS(kpi_results, rds_path) message(paste("✓ Farm-level KPI stats exported to:", rds_path)) # Print summary cat("\n=== FARM-LEVEL KPI SUMMARY ===\n") cat("Report Date:", as.character(report_date), "\n") cat("Week:", current_week, "Year:", year, "\n") cat("Fields Processed:", length(unique(field_kpi_stats$field)), "\n") cat("Tiles Processed:", length(unique(field_kpi_stats$tile_id)), "\n") cat("\n--- Field Summary Statistics (Mean across tiles) ---\n") print(head(field_summary_stats, 20)) return(kpi_results) } # ============================================================================ # HELPER: Extract field-level statistics from CI raster (all pixels, single call) # ============================================================================ extract_field_statistics_from_ci <- function(ci_band, field_boundaries_sf) { #' Extract CI statistics for all fields from a single CI raster band #' #' This function extracts all pixel values for each field in one terra::extract call, #' then calculates mean, CV, and percentiles from those pixels. #' #' @param ci_band Single CI band from terra raster #' @param field_boundaries_sf SF object with field geometries #' @return Data frame with columns: field_idx, mean_ci, cv, p10, p90, min_ci, max_ci, pixel_count_valid, pixel_count_total # SINGLE EXTRACTION: Get all pixels for all fields at once (no aggregation function) # Result: data.frame with ID column (field indices) and value column (pixel values) extract_result <- terra::extract(ci_band, field_boundaries_sf) # Calculate statistics for each field stats_list <- list() for (field_idx in seq_len(nrow(field_boundaries_sf))) { # Get all pixels for this field from the single extraction # extract_result has columns [ID, value] where ID is field index (1-based) field_pixels <- extract_result[extract_result$ID == field_idx, 2] pixels <- as.numeric(field_pixels[!is.na(field_pixels)]) # Remove NAs if (length(pixels) == 0) { # No data for this field stats_list[[field_idx]] <- data.frame( field_idx = field_idx, mean_ci = NA_real_, cv = NA_real_, p10 = NA_real_, p90 = NA_real_, min_ci = NA_real_, max_ci = NA_real_, pixel_count_valid = 0, pixel_count_total = 0, stringsAsFactors = FALSE ) next } # Calculate all statistics from pixels array mean_val <- mean(pixels, na.rm = TRUE) cv_val <- if (mean_val > 0) sd(pixels, na.rm = TRUE) / mean_val else NA_real_ p10_val <- quantile(pixels, probs = CI_PERCENTILE_LOW, na.rm = TRUE)[[1]] p90_val <- quantile(pixels, probs = CI_PERCENTILE_HIGH, na.rm = TRUE)[[1]] min_val <- min(pixels, na.rm = TRUE) max_val <- max(pixels, na.rm = TRUE) stats_list[[field_idx]] <- data.frame( field_idx = field_idx, mean_ci = mean_val, cv = cv_val, p10 = p10_val, p90 = p90_val, min_ci = min_val, max_ci = max_val, pixel_count_valid = length(pixels), pixel_count_total = nrow(extract_result[extract_result$ID == field_idx, ]), stringsAsFactors = FALSE ) } return(dplyr::bind_rows(stats_list)) } # ============================================================================ # MAIN # ============================================================================ main <- function() { # Parse command-line arguments args <- commandArgs(trailingOnly = TRUE) # end_date (arg 1) end_date <- if (length(args) >= 1 && !is.na(args[1])) { as.Date(args[1]) } else if (exists("end_date_str", envir = .GlobalEnv)) { as.Date(get("end_date_str", envir = .GlobalEnv)) } else { Sys.Date() } # project_dir (arg 2) project_dir <- if (length(args) >= 2 && !is.na(args[2])) { as.character(args[2]) } else if (exists("project_dir", envir = .GlobalEnv)) { get("project_dir", envir = .GlobalEnv) } else { "angata" } # offset (arg 3) - for backward compatibility with old 09 offset <- if (length(args) >= 3 && !is.na(args[3])) { as.numeric(args[3]) } else { 7 } assign("project_dir", project_dir, envir = .GlobalEnv) assign("end_date_str", format(end_date, "%Y-%m-%d"), envir = .GlobalEnv) message("\n" %+% strrep("=", 70)) message("80_CALCULATE_KPIs.R - CONSOLIDATED KPI CALCULATION") message(strrep("=", 70)) message("Date:", format(end_date, "%Y-%m-%d")) message("Project:", project_dir) message("Mode: Per-field analysis (SC-64) + Farm-level KPIs") message("") # Load configuration and utilities # source(here("r_app", "crop_messaging_utils.R")) tryCatch({ source(here("r_app", "parameters_project.R")) }, error = function(e) { stop("Error loading parameters_project.R: ", e$message) }) tryCatch({ source(here("r_app", "30_growth_model_utils.R")) }, error = function(e) { warning("30_growth_model_utils.R not found - yield prediction KPI will use placeholder data") }) # ========== PER-FIELD ANALYSIS (SC-64) ========== message("\n" %+% strrep("-", 70)) message("PHASE 1: PER-FIELD WEEKLY ANALYSIS (SC-64 ENHANCEMENTS)") message(strrep("-", 70)) current_week <- as.numeric(format(end_date, "%V")) year <- as.numeric(format(end_date, "%Y")) previous_week <- current_week - 1 if (previous_week < 1) previous_week <- 52 message(paste("Week:", current_week, "/ Year:", year)) # Find tile files - approach from Script 20 message("Finding tile files...") tile_pattern <- sprintf("week_%02d_%d_([0-9]{2})\\.tif", current_week, year) # Detect grid size subdirectory detected_grid_size <- NA if (dir.exists(weekly_tile_max)) { subfolders <- list.dirs(weekly_tile_max, full.names = FALSE, recursive = FALSE) grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE) if (length(grid_patterns) > 0) { detected_grid_size <- grid_patterns[1] mosaic_dir <- file.path(weekly_tile_max, detected_grid_size) message(paste(" Using grid-size subdirectory:", detected_grid_size)) } } tile_files <- list.files(mosaic_dir, pattern = tile_pattern, full.names = TRUE) if (length(tile_files) == 0) { stop(paste("No tile files found for week", current_week, year, "in", mosaic_dir)) } message(paste(" Found", length(tile_files), "tiles")) # Load field boundaries tryCatch({ boundaries_result <- load_field_boundaries(data_dir) if (is.list(boundaries_result) && "field_boundaries_sf" %in% names(boundaries_result)) { field_boundaries_sf <- boundaries_result$field_boundaries_sf } else { field_boundaries_sf <- boundaries_result } if (nrow(field_boundaries_sf) == 0) { stop("No fields loaded from boundaries") } message(paste(" Loaded", nrow(field_boundaries_sf), "fields")) }, error = function(e) { stop("ERROR loading field boundaries: ", e$message) }) message("Loading historical field data for trend calculations...") num_weeks_to_load <- if (TEST_MODE) TEST_MODE_NUM_WEEKS else max(WEEKS_FOR_FOUR_WEEK_TREND, WEEKS_FOR_CV_TREND_LONG) if (TEST_MODE) { message(paste(" TEST MODE: Loading only", num_weeks_to_load, "weeks")) } historical_data <- load_historical_field_data(project_dir, current_week, reports_dir, num_weeks = num_weeks_to_load) planting_dates <- extract_planting_dates(harvesting_data, field_boundaries_sf) # Validate planting_dates if (is.null(planting_dates) || nrow(planting_dates) == 0) { message("WARNING: No planting dates available. Using NA for all fields.") planting_dates <- data.frame( field_id = field_boundaries_sf$field, date = rep(as.Date(NA), nrow(field_boundaries_sf)), stringsAsFactors = FALSE ) } # SCRIPT 20 APPROACH: Loop through tiles, extract all fields from each tile # ============================================================================ # NEW MODULAR APPROACH: Load/Calculate weekly stats, apply trends # ============================================================================ # Build tile grid (needed by calculate_field_statistics) message("\nBuilding tile grid for current week...") tile_grid <- build_tile_grid(mosaic_dir, current_week, year) message("\nUsing modular RDS-based approach for weekly statistics...") # Load/calculate CURRENT week stats (from tiles or RDS cache) message("\n1. Loading/calculating CURRENT week statistics (week", current_week, ")...") current_stats <- load_or_calculate_weekly_stats( week_num = current_week, year = year, project_dir = project_dir, field_boundaries_sf = field_boundaries_sf, mosaic_dir = tile_grid$mosaic_dir, reports_dir = reports_dir, report_date = end_date ) message(paste(" ✓ Loaded/calculated stats for", nrow(current_stats), "fields in current week")) # Load/calculate PREVIOUS week stats (from RDS cache or tiles) message("\n2. Loading/calculating PREVIOUS week statistics (week", previous_week, ")...") # Calculate report date for previous week (7 days before current) prev_report_date <- end_date - 7 prev_stats <- load_or_calculate_weekly_stats( week_num = previous_week, year = year, project_dir = project_dir, field_boundaries_sf = field_boundaries_sf, mosaic_dir = tile_grid$mosaic_dir, reports_dir = reports_dir, report_date = prev_report_date ) message(paste(" ✓ Loaded/calculated stats for", nrow(prev_stats), "fields in previous week")) message(paste(" Columns in prev_stats:", paste(names(prev_stats), collapse = ", "))) message(paste(" CV column non-NA values in prev_stats:", sum(!is.na(prev_stats$CV)))) # Apply trend calculations (requires both weeks) message("\n3. Calculating trend columns...") current_stats <- calculate_kpi_trends(current_stats, prev_stats) message(paste(" ✓ Added Weekly_ci_change, CV_Trend_Short_Term, nmr_weeks_in_this_phase")) # ============================================================================ # Build final output dataframe with all 21 columns # ============================================================================ message("\nBuilding final field analysis output...") # Pre-calculate acreages with geometry validation # This avoids geometry errors during field_analysis construction acreage_lookup <- tryCatch({ lookup_df <- field_boundaries_sf %>% sf::st_drop_geometry() %>% as.data.frame() %>% mutate( geometry_valid = sapply(seq_len(nrow(field_boundaries_sf)), function(idx) { tryCatch({ sf::st_is_valid(field_boundaries_sf[idx, ]) }, error = function(e) FALSE) }), area_ha = 0 ) # Calculate area for valid geometries for (idx in which(lookup_df$geometry_valid)) { tryCatch({ area_m2 <- as.numeric(sf::st_area(field_boundaries_sf[idx, ])) lookup_df$area_ha[idx] <- area_m2 / 10000 }, error = function(e) { lookup_df$area_ha[idx] <<- NA_real_ }) } # Convert hectares to acres lookup_df %>% mutate(acreage = area_ha / 0.404686) %>% select(field, acreage) }, error = function(e) { message(paste("Warning: Could not calculate acreages from geometries -", e$message)) data.frame(field = character(0), acreage = numeric(0)) }) field_analysis_df <- current_stats %>% mutate( # Column 2: Farm_Section (user fills) Farm_Section = NA_character_, # Column 3: Field_name (from GeoJSON - already have Field_id, can look up) Field_name = Field_id, # Column 4: Acreage (calculate from geometry) Acreage = { acreages_vec <- acreage_lookup$acreage[match(Field_id, acreage_lookup$field)] if_else(is.na(acreages_vec), 0, acreages_vec) }, # Columns 5-6: Already in current_stats (Mean_CI, Weekly_ci_change) # Column 7: Four_week_trend (Phase 3 future) Four_week_trend = NA_character_, # Column 8: Last_harvest_or_planting_date (dummy for now) Last_harvest_or_planting_date = UNIFORM_PLANTING_DATE, # Columns 9-10: Already in current_stats (Age_week, Phase) # Column 11: nmr_weeks_in_this_phase (already calculated) # Column 12: Germination_progress (already calculated) # Column 13: Imminent_prob (placeholder) Imminent_prob = "placeholder data", # Column 14: Status_trigger (need to add) Status_trigger = { triggers <- sapply(seq_len(nrow(current_stats)), function(idx) { field_id <- current_stats$Field_id[idx] field_idx <- which(field_boundaries_sf$field == field_id)[1] if (is.na(field_idx)) return(NA_character_) # Reconstruct CI values from Mean_CI for status trigger logic # For now, use simplified approach age_w <- current_stats$Age_week[idx] ci_change <- current_stats$Weekly_ci_change[idx] # Using mean CI as proxy (could be improved with pixel distribution) ci_vals <- rep(current_stats$Mean_CI[idx], 100) get_status_trigger(ci_vals, ci_change, age_w) }) triggers }, # Columns 15-16: Already in current_stats (CI_range, CI_Percentiles) # Column 17: Already in current_stats (CV) # Column 18: Already in current_stats (CV_Trend_Short_Term) # Column 19: CV_Trend_Long_Term (Phase 3 future) CV_Trend_Long_Term = NA_real_, # Columns 20-21: Already in current_stats (Cloud_pct_clear, Cloud_category) .keep = "all" # Keep all existing columns ) %>% select( Field_id, Farm_Section, Field_name, Acreage, Mean_CI, Weekly_ci_change, Four_week_trend, Last_harvest_or_planting_date, Age_week, Phase, nmr_weeks_in_this_phase, Germination_progress, Imminent_prob, Status_trigger, CI_range, CI_Percentiles, CV, CV_Trend_Short_Term, CV_Trend_Long_Term, Cloud_pct_clear, Cloud_category ) message(paste("✓ Built final output with", nrow(field_analysis_df), "fields and 21 columns")) summary_statistics_df <- generate_field_analysis_summary(field_analysis_df) export_paths <- export_field_analysis_excel( field_analysis_df, summary_statistics_df, project_dir, current_week, reports_dir ) cat("\n--- Per-field Results (first 10) ---\n") available_cols <- c("Field_id", "Acreage", "Age_week", "Mean_CI", "Four_week_trend", "Status_trigger", "Cloud_category") available_cols <- available_cols[available_cols %in% names(field_analysis_df)] if (length(available_cols) > 0) { print(head(field_analysis_df[, available_cols], 10)) } cat("\n--- Summary Statistics ---\n") print(summary_statistics_df) # ========== FARM-LEVEL KPI AGGREGATION ========== # Aggregate the per-field analysis into farm-level summary statistics cat("\n=== CALCULATING FARM-LEVEL KPI SUMMARY ===\n") # Filter to only fields that have actual data (non-NA CI and valid acreage) field_data <- field_analysis_df %>% filter(!is.na(Mean_CI) & !is.na(Acreage)) %>% filter(Acreage > 0) if (nrow(field_data) > 0) { if (nrow(field_data) > 0) { # Create summary statistics farm_summary <- list() # 1. PHASE DISTRIBUTION phase_dist <- field_data %>% group_by(Phase) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Phase) farm_summary$phase_distribution <- phase_dist # 2. STATUS TRIGGER DISTRIBUTION status_dist <- field_data %>% group_by(Status_trigger) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Status_trigger) farm_summary$status_distribution <- status_dist # 3. CLOUD COVERAGE DISTRIBUTION cloud_dist <- field_data %>% group_by(Cloud_category) %>% summarise( num_fields = n(), acreage = sum(Acreage, na.rm = TRUE), .groups = 'drop' ) %>% rename(Category = Cloud_category) farm_summary$cloud_distribution <- cloud_dist # 4. OVERALL STATISTICS farm_summary$overall_stats <- data.frame( total_fields = nrow(field_data), total_acreage = sum(field_data$Acreage, na.rm = TRUE), mean_ci = round(mean(field_data$Mean_CI, na.rm = TRUE), 2), median_ci = round(median(field_data$Mean_CI, na.rm = TRUE), 2), mean_cv = round(mean(field_data$CI_CV, na.rm = TRUE), 4), week = current_week, year = year, date = as.character(end_date) ) # Print summaries cat("\n--- PHASE DISTRIBUTION ---\n") print(phase_dist) cat("\n--- STATUS TRIGGER DISTRIBUTION ---\n") print(status_dist) cat("\n--- CLOUD COVERAGE DISTRIBUTION ---\n") print(cloud_dist) cat("\n--- OVERALL FARM STATISTICS ---\n") print(farm_summary$overall_stats) farm_kpi_results <- farm_summary } else { farm_kpi_results <- NULL } } else { farm_kpi_results <- NULL } # ========== FINAL SUMMARY ========== cat("\n" %+% strrep("=", 70) %+% "\n") cat("80_CALCULATE_KPIs.R - COMPLETION SUMMARY\n") cat(strrep("=", 70) %+% "\n") cat("Per-field analysis fields analyzed:", nrow(field_analysis_df), "\n") cat("Excel export:", export_paths$excel, "\n") cat("RDS export:", export_paths$rds, "\n") cat("CSV export:", export_paths$csv, "\n") if (!is.null(farm_kpi_results)) { cat("\nFarm-level KPIs: CALCULATED\n") } else { cat("\nFarm-level KPIs: SKIPPED (no valid tile data extracted)\n") } cat("\n✓ Consolidated KPI calculation complete!\n") cat(" - Per-field data exported\n") cat(" - Farm-level KPIs calculated\n") cat(" - All outputs in:", reports_dir, "\n\n") } if (sys.nframe() == 0) { main() }