# 09b_FIELD_ANALYSIS_WEEKLY.R (NEW - TILE-AWARE WITH PARALLEL PROCESSING) # ============================================================================ # Per-field weekly analysis with tile-based mosaic extraction and parallel processing # # MAJOR IMPROVEMENTS OVER 09_field_analysis_weekly.R: # - Tile-aware: Loads only relevant tiles for each field (memory efficient) # - Parallel processing: Uses furrr for parallel field extraction (1300+ fields supported) # - Field-based cloud analysis: Cloud coverage calculated per-field from tiles # - Scalable: Architecture ready for 13,000+ fields # # Generates detailed field-level CSV export with: # - Field identifiers and areas # - Weekly CI change (mean ± std) from tile-based extraction # - Age-based phase assignment (Germination, Tillering, Grand Growth, Maturation) # - Harvest imminence detection (Phase 1 from LSTM model) - optional # - Status triggers (non-exclusive, can coexist with harvest imminent phase) # - Phase transition tracking (weeks in current phase) # - Cloud coverage analysis from tiles (per-field, not mosaic-wide) # # Cloud Coverage Per-Field: # - Extracts from relevant tiles that field intersects # - Categories: Clear view (>=99.5%), Partial coverage (0-99.5%), No image available (0%) # - Reports % of fields by cloud category # # Parallel Processing: # - Uses furrr::future_map_df() for CPU-parallel field extraction # - Configure workers before running: future::plan(future::multisession, workers = N) # - Each worker: loads tile(s), extracts CI, calculates stats # - Significant speedup for 1000+ fields # # Output: # - Excel (.xlsx) with Field Data sheet and Summary sheet # - Excel (.xlsx) weekly harvest predictions for tracking # - RDS file with field_analysis and field_analysis_summary for Rmd reports # - Summary includes: Monitored area, Cloud coverage, Phase distribution, Status triggers # # Usage: Rscript 09b_field_analysis_weekly.R [end_date] [project_dir] # - end_date: End date for analysis (YYYY-MM-DD format), default: today # - project_dir: Project directory name (e.g., "aura", "esa", "angata") # # Example: # Rscript 09b_field_analysis_weekly.R 2026-01-08 angata # # 1. Load required libraries suppressPackageStartupMessages({ library(here) library(sf) library(terra) library(dplyr) library(tidyr) library(lubridate) library(readr) library(readxl) library(writexl) library(purrr) library(furrr) library(future) # Optional: torch for harvest model inference (will skip if not available) tryCatch({ library(torch) }, error = function(e) { message("Note: torch package not available - harvest model inference will be skipped") }) }) # ============================================================================ # PHASE AND STATUS TRIGGER DEFINITIONS # ============================================================================ PHASE_DEFINITIONS <- data.frame( phase = c("Germination", "Tillering", "Grand Growth", "Maturation"), age_start = c(0, 4, 17, 39), age_end = c(6, 16, 39, 200), stringsAsFactors = FALSE ) STATUS_TRIGGERS <- data.frame( trigger = c( "germination_started", "germination_complete", "stress_detected_whole_field", "strong_recovery", "growth_on_track", "maturation_progressing", "harvest_ready" ), age_min = c(0, 0, NA, NA, 4, 39, 45), age_max = c(6, 6, NA, NA, 39, 200, 200), description = c( "10% of field CI > 2", "70% of field CI >= 2", "CI decline > -1.5 + low CV", "CI increase > +1.5", "CI increasing consistently", "High CI, stable/declining", "Age 45+ weeks (ready to harvest)" ), stringsAsFactors = FALSE ) # ============================================================================ # TILE-AWARE HELPER FUNCTIONS # ============================================================================ #' Get tile IDs that a field geometry intersects #' #' @param field_geom Single field geometry (sf or terra::vect) #' @param tile_grid Data frame with tile extents (id, xmin, xmax, ymin, ymax) #' @return Numeric vector of tile IDs that field intersects #' get_tile_ids_for_field <- function(field_geom, tile_grid) { # Convert field to bounding box if (inherits(field_geom, "sf")) { field_bbox <- sf::st_bbox(field_geom) field_xmin <- field_bbox["xmin"] field_xmax <- field_bbox["xmax"] field_ymin <- field_bbox["ymin"] field_ymax <- field_bbox["ymax"] } else if (inherits(field_geom, "SpatVector")) { field_bbox <- terra::ext(field_geom) field_xmin <- field_bbox$xmin field_xmax <- field_bbox$xmax field_ymin <- field_bbox$ymin field_ymax <- field_bbox$ymax } else { stop("field_geom must be sf or terra::vect object") } # Check intersection with each tile extent intersecting_tiles <- tile_grid$id[ !(tile_grid$xmax < field_xmin | tile_grid$xmin > field_xmax | tile_grid$ymax < field_ymin | tile_grid$ymin > field_ymax) ] return(as.numeric(intersecting_tiles)) } #' Load CI tiles that a field intersects #' #' @param field_geom Single field geometry #' @param tile_ids Numeric vector of tile IDs to load #' @param week_num Week number #' @param year Year #' @param mosaic_dir Directory with weekly tiles #' @return Single CI raster (merged if multiple tiles, or single tile) #' load_tiles_for_field <- function(field_geom, tile_ids, week_num, year, mosaic_dir) { if (length(tile_ids) == 0) { return(NULL) } # Load relevant tiles tiles_list <- list() for (tile_id in sort(tile_ids)) { tile_filename <- sprintf("week_%02d_%d_%02d.tif", week_num, year, tile_id) tile_path <- file.path(mosaic_dir, tile_filename) if (file.exists(tile_path)) { tryCatch({ tile_rast <- terra::rast(tile_path) # Extract CI band (band 5 or named "CI") if ("CI" %in% names(tile_rast)) { ci_band <- tile_rast[["CI"]] } else if (terra::nlyr(tile_rast) >= 5) { ci_band <- tile_rast[[5]] } else { ci_band <- tile_rast[[1]] } names(ci_band) <- "CI" tiles_list[[length(tiles_list) + 1]] <- ci_band }, error = function(e) { message(paste(" Warning: Could not load tile", tile_id, ":", e$message)) }) } } if (length(tiles_list) == 0) { return(NULL) } # If multiple tiles, merge them; otherwise return single tile if (length(tiles_list) == 1) { return(tiles_list[[1]]) } else { tryCatch({ rsrc <- terra::sprc(tiles_list) merged <- terra::mosaic(rsrc, fun = "max") return(merged) }, error = function(e) { message(paste(" Warning: Could not merge tiles:", e$message)) return(tiles_list[[1]]) # Fallback to first tile }) } } #' Build tile grid from available weekly tile files #' #' @param mosaic_dir Directory with weekly tiles #' @param week_num Week number to discover tiles #' @param year Year to discover tiles #' @return Data frame with columns: id, xmin, xmax, ymin, ymax #' build_tile_grid <- function(mosaic_dir, week_num, year) { # Find all tiles for this week/year tile_pattern <- sprintf("week_%02d_%d_([0-9]{2})\\.tif", week_num, year) tile_files <- list.files(mosaic_dir, pattern = tile_pattern, full.names = TRUE) if (length(tile_files) == 0) { stop(paste("No tile files found for week", week_num, year, "in", mosaic_dir)) } # Extract extents from each tile tile_grid <- data.frame( id = integer(), xmin = numeric(), xmax = numeric(), ymin = numeric(), ymax = numeric(), stringsAsFactors = FALSE ) for (tile_file in tile_files) { tryCatch({ # Extract tile ID from filename matches <- regmatches(basename(tile_file), regexpr("_([0-9]{2})\\.tif$", basename(tile_file))) if (length(matches) > 0) { tile_id <- as.integer(gsub("[^0-9]", "", matches)) # Load raster and get extent tile_rast <- terra::rast(tile_file) ext <- terra::ext(tile_rast) tile_grid <- rbind(tile_grid, data.frame( id = tile_id, xmin = ext$xmin, xmax = ext$xmax, ymin = ext$ymin, ymax = ext$ymax, stringsAsFactors = FALSE )) } }, error = function(e) { message(paste(" Warning: Could not process tile", basename(tile_file), ":", e$message)) }) } if (nrow(tile_grid) == 0) { stop("Could not extract extents from any tile files") } return(tile_grid) } # ============================================================================ # HELPER FUNCTIONS (FROM ORIGINAL 09) # ============================================================================ get_phase_by_age <- function(age_weeks) { if (is.na(age_weeks)) return(NA_character_) for (i in seq_len(nrow(PHASE_DEFINITIONS))) { if (age_weeks >= PHASE_DEFINITIONS$age_start[i] && age_weeks <= PHASE_DEFINITIONS$age_end[i]) { return(PHASE_DEFINITIONS$phase[i]) } } return("Unknown") } get_status_trigger <- function(ci_values, ci_change, age_weeks) { if (is.na(age_weeks) || length(ci_values) == 0) return(NA_character_) ci_values <- ci_values[!is.na(ci_values)] if (length(ci_values) == 0) return(NA_character_) pct_above_2 <- sum(ci_values > 2) / length(ci_values) * 100 pct_at_or_above_2 <- sum(ci_values >= 2) / length(ci_values) * 100 ci_cv <- if (mean(ci_values, na.rm = TRUE) > 0) sd(ci_values) / mean(ci_values, na.rm = TRUE) else 0 mean_ci <- mean(ci_values, na.rm = TRUE) # Germination phase triggers (age 0-6) if (age_weeks >= 0 && age_weeks <= 6) { if (pct_at_or_above_2 >= 70) { return("germination_complete") } else if (pct_above_2 > 10) { return("germination_started") } } # Harvest ready (45+ weeks) - check first to prioritize if (age_weeks >= 45) { return("harvest_ready") } # Stress detection (any phase except Germination) if (age_weeks > 6 && !is.na(ci_change) && ci_change < -1.5 && ci_cv < 0.25) { return("stress_detected_whole_field") } # Strong recovery (any phase except Germination) if (age_weeks > 6 && !is.na(ci_change) && ci_change > 1.5) { return("strong_recovery") } # Growth on track (Tillering/Grand Growth, 4-39 weeks) if (age_weeks >= 4 && age_weeks < 39 && !is.na(ci_change) && ci_change > 0.2) { return("growth_on_track") } # Maturation progressing (39-45 weeks, high CI stable/declining) if (age_weeks >= 39 && age_weeks < 45 && mean_ci > 3.5) { return("maturation_progressing") } return(NA_character_) } load_previous_week_csv <- function(project_dir, current_week, reports_dir) { lookback_weeks <- c(1, 2, 3) for (lookback in lookback_weeks) { previous_week <- current_week - lookback if (previous_week < 1) previous_week <- previous_week + 52 csv_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", previous_week), ".csv") csv_path <- file.path(reports_dir, "kpis", "field_analysis", csv_filename) if (file.exists(csv_path)) { tryCatch({ data <- read_csv(csv_path, show_col_types = FALSE) return(list(data = data, weeks_lookback = lookback, found = TRUE)) }, error = function(e) { message(paste("Warning: Could not load", basename(csv_path), ":", e$message)) }) } } message("No previous field analysis CSV found. Phase tracking will be age-based only.") return(list(data = NULL, weeks_lookback = NA, found = FALSE)) } USE_UNIFORM_AGE <- TRUE UNIFORM_PLANTING_DATE <- as.Date("2025-01-01") extract_planting_dates <- function(harvesting_data) { if (USE_UNIFORM_AGE) { message(paste("Using uniform planting date for all fields:", UNIFORM_PLANTING_DATE)) return(data.frame( field_id = character(), planting_date = as.Date(character()), stringsAsFactors = FALSE )) } if (is.null(harvesting_data) || nrow(harvesting_data) == 0) { message("Warning: No harvesting data available.") return(NULL) } tryCatch({ planting_dates <- harvesting_data %>% arrange(field, desc(season_start)) %>% distinct(field, .keep_all = TRUE) %>% select(field, season_start) %>% rename(field_id = field, planting_date = season_start) %>% filter(!is.na(planting_date)) %>% as.data.frame() message(paste("Extracted planting dates for", nrow(planting_dates), "fields (most recent season)")) return(planting_dates) }, error = function(e) { message(paste("Error extracting planting dates:", e$message)) return(NULL) }) } # ============================================================================ # NOTE: Cloud coverage is now calculated inline in analyze_single_field() # ============================================================================ # Cloud coverage logic (per-field, from same CI extraction): # - Extract ALL pixels from field polygon (including NAs from clouds/missing data) # - Count: num_data = non-NA pixels, num_total = total pixels in field # - Calculate: pct_clear = (num_data / num_total) * 100 # - Categorize: >=99.5% = "Clear view", >0% = "Partial coverage", 0% = "No image available" # # This ensures LOGICAL CONSISTENCY: # - If CI_mean has value → at least 1 pixel has data → pct_clear > 0 ✓ # - If pct_clear = 0 → no data → CI_mean = NA ✓ # - Eliminates double-extraction inefficiency # ============================================================================ # PARALLEL FIELD ANALYSIS FUNCTION # ============================================================================ #' Analyze single field (for parallel processing) #' #' This function processes ONE field at a time and is designed to run in parallel #' Each call: loads relevant tiles, extracts CI, calculates statistics #' #' @param field_idx Index in field_boundaries_sf #' @param field_boundaries_sf All field boundaries (sf object) #' @param current_ci_rasters List of currently loaded CI rasters (by tile_id) #' @param previous_ci_rasters List of previously loaded CI rasters (by tile_id) #' @param tile_grid Data frame with tile extents #' @param week_num Current week number #' @param year Current year #' @param mosaic_dir Directory with weekly tiles #' @param previous_week_csv Previous week's CSV data #' @param planting_dates Planting dates lookup #' @param report_date Report date #' @param harvest_imminence_data Harvest imminence predictions (optional) #' #' @return Single-row data frame with field analysis #' analyze_single_field <- function(field_idx, field_boundaries_sf, tile_grid, week_num, year, mosaic_dir, previous_week_csv = NULL, planting_dates = NULL, report_date = Sys.Date(), harvest_imminence_data = NULL) { tryCatch({ # Get field info field_id <- field_boundaries_sf$field[field_idx] farm_section <- if ("sub_area" %in% names(field_boundaries_sf)) { field_boundaries_sf$sub_area[field_idx] } else { NA_character_ } field_name <- field_id # Get field geometry and validate field_sf <- field_boundaries_sf[field_idx, ] if (sf::st_is_empty(field_sf) || any(is.na(sf::st_geometry(field_sf)))) { return(data.frame( Field_id = field_id, Farm_section = farm_section, CI_mean = NA_real_, error = "Empty or invalid geometry" )) } # Calculate field area field_area_ha <- as.numeric(sf::st_area(field_sf)) / 10000 field_area_acres <- field_area_ha / 0.404686 # Determine which tiles this field intersects tile_ids <- get_tile_ids_for_field(field_sf, tile_grid) # Load current CI tiles for this field current_ci <- load_tiles_for_field(field_sf, tile_ids, week_num, year, mosaic_dir) if (is.null(current_ci)) { return(data.frame( Field_id = field_id, Farm_section = farm_section, Hectares = field_area_ha, Acreage = field_area_acres, CI_mean = NA_real_, error = "No tile data available" )) } # Extract CI values for current field (keep ALL pixels including NAs for cloud calculation) field_vect <- terra::vect(sf::as_Spatial(field_sf)) terra::crs(field_vect) <- terra::crs(current_ci) all_extracted <- terra::extract(current_ci, field_vect)[, 2] # ALL pixels (including NAs) current_ci_vals <- all_extracted[!is.na(all_extracted)] # Only non-NA for CI analysis # Calculate cloud coverage from SAME extraction (no double-extraction) # Logic: count non-NA pixels vs total pixels in field num_total <- length(all_extracted) num_data <- sum(!is.na(all_extracted)) pct_clear <- if (num_total > 0) round((num_data / num_total) * 100, 1) else 0 # 0 = no data # Categorize cloud coverage - check for no data first cloud_cat <- if (num_data == 0) "No image available" # No data at all (100% cloud) else if (pct_clear >= 99.5) "Clear view" # 99.5%+ data else "Partial coverage" # Some data but with gaps cloud_pct <- 100 - pct_clear # Cloud percentage (inverse of clear percentage) # If no CI values extracted, return early with cloud info if (length(current_ci_vals) == 0) { return(data.frame( Field_id = field_id, Farm_section = farm_section, Hectares = field_area_ha, Acreage = field_area_acres, CI_mean = NA_real_, Cloud_pct = cloud_pct, Cloud_category = cloud_cat, error = "No CI values extracted" )) } # Calculate current CI statistics mean_ci_current <- mean(current_ci_vals, na.rm = TRUE) ci_std <- sd(current_ci_vals, na.rm = TRUE) cv_current <- ci_std / mean_ci_current range_min <- min(current_ci_vals, na.rm = TRUE) range_max <- max(current_ci_vals, na.rm = TRUE) range_str <- sprintf("%.1f-%.1f", range_min, range_max) # Calculate weekly CI change (compare with previous week if available) weekly_ci_change <- NA previous_ci_vals <- NULL # Try to load previous week tiles for this field tryCatch({ previous_ci <- load_tiles_for_field(field_sf, tile_ids, week_num - 1, year, mosaic_dir) if (!is.null(previous_ci)) { previous_ci_vals <- terra::extract(previous_ci, field_vect)[, 2] previous_ci_vals <- previous_ci_vals[!is.na(previous_ci_vals)] if (length(previous_ci_vals) > 0) { mean_ci_previous <- mean(previous_ci_vals, na.rm = TRUE) weekly_ci_change <- mean_ci_current - mean_ci_previous } } }, error = function(e) { # Silent fail - previous week data not available is acceptable }) # Format CI change if (is.na(weekly_ci_change)) { weekly_ci_change_str <- sprintf("%.1f ± %.2f", mean_ci_current, ci_std) } else { weekly_ci_change_str <- sprintf("%.1f ± %.2f (Δ %.2f)", mean_ci_current, ci_std, weekly_ci_change) } # Calculate age age_weeks <- NA if (!is.null(planting_dates) && nrow(planting_dates) > 0) { planting_row <- which(planting_dates$field_id == field_id) if (length(planting_row) > 0) { planting_date <- planting_dates$planting_date[planting_row[1]] if (!is.na(planting_date)) { age_weeks <- as.numeric(difftime(report_date, planting_date, units = "weeks")) } } } # If using uniform age if (USE_UNIFORM_AGE) { age_weeks <- as.numeric(difftime(report_date, UNIFORM_PLANTING_DATE, units = "weeks")) } # Calculate germination progress pct_ci_above_2 <- sum(current_ci_vals > 2) / length(current_ci_vals) * 100 pct_ci_ge_2 <- sum(current_ci_vals >= 2) / length(current_ci_vals) * 100 germination_progress_str <- NA_character_ if (!is.na(age_weeks) && age_weeks >= 0 && age_weeks <= 6) { germination_progress_str <- sprintf("%.0f%% at CI >= 2", pct_ci_ge_2) } # Assign phase and trigger phase <- "Unknown" imminent_prob_val <- NA if (!is.null(harvest_imminence_data) && nrow(harvest_imminence_data) > 0) { imminent_row <- which(harvest_imminence_data$field_id == field_id) if (length(imminent_row) > 0) { imminent_prob_val <- harvest_imminence_data$imminent_prob[imminent_row[1]] if (!is.na(imminent_prob_val) && imminent_prob_val > 0.5) { phase <- "Harvest Imminent" } } } # If not harvest imminent, use age-based phase if (phase == "Unknown") { phase <- get_phase_by_age(age_weeks) } status_trigger <- get_status_trigger(current_ci_vals, weekly_ci_change, age_weeks) # Track phase transitions nmr_weeks_in_phase <- 1 if (!is.null(previous_week_csv) && nrow(previous_week_csv) > 0) { prev_row <- which(previous_week_csv$Field_id == field_id) if (length(prev_row) > 0) { prev_phase <- previous_week_csv$`Phase (age based)`[prev_row[1]] if (!is.na(prev_phase) && prev_phase == phase) { prev_weeks <- as.numeric(previous_week_csv$Weeks_in_phase[prev_row[1]]) nmr_weeks_in_phase <- if (is.na(prev_weeks)) 1 else prev_weeks + 1 } } } # Compile result result <- data.frame( Field_id = field_id, Farm_section = farm_section, Hectares = field_area_ha, Acreage = field_area_acres, CI_mean = mean_ci_current, CI_std = ci_std, CI_range = range_str, CI_change_weekly = weekly_ci_change_str, CI_change_value = weekly_ci_change, CV = cv_current, Age_week = age_weeks, `Phase (age based)` = phase, Germination_progress = germination_progress_str, Status_trigger = status_trigger, Weeks_in_phase = nmr_weeks_in_phase, Imminent_prob = imminent_prob_val, Cloud_pct = cloud_pct, Cloud_category = cloud_cat, stringsAsFactors = FALSE ) return(result) }, error = function(e) { message(paste("Error analyzing field", field_idx, ":", e$message)) return(data.frame( Field_id = as.character(field_idx), error = e$message )) }) } # ============================================================================ # SUMMARY GENERATION # ============================================================================ generate_field_analysis_summary <- function(field_df) { message("Generating summary statistics...") # Total acreage (needed for all percentages) total_acreage <- sum(field_df$Acreage, na.rm = TRUE) # Phase breakdown germination_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Germination"], na.rm = TRUE) tillering_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Tillering"], na.rm = TRUE) grand_growth_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Grand Growth"], na.rm = TRUE) maturation_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Maturation"], na.rm = TRUE) unknown_phase_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Unknown"], na.rm = TRUE) # Status trigger breakdown harvest_ready_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "harvest_ready"], na.rm = TRUE) stress_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "stress_detected_whole_field"], na.rm = TRUE) recovery_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "strong_recovery"], na.rm = TRUE) growth_on_track_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "growth_on_track"], na.rm = TRUE) germination_complete_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "germination_complete"], na.rm = TRUE) germination_started_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "germination_started"], na.rm = TRUE) no_trigger_acreage <- sum(field_df$Acreage[is.na(field_df$Status_trigger)], na.rm = TRUE) # Cloud coverage breakdown - COUNT FIELDS, not acreage for cloud analysis clear_fields <- sum(field_df$Cloud_category == "Clear view", na.rm = TRUE) partial_fields <- sum(field_df$Cloud_category == "Partial coverage", na.rm = TRUE) no_image_fields <- sum(field_df$Cloud_category == "No image available", na.rm = TRUE) total_fields <- nrow(field_df) # Cloud acreage for reporting clear_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "Clear view"], na.rm = TRUE) partial_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "Partial coverage"], na.rm = TRUE) no_image_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "No image available"], na.rm = TRUE) # Create summary table summary_df <- data.frame( Category = c( "=== PHASE DISTRIBUTION ===", "Germination", "Tillering", "Grand Growth", "Maturation", "Unknown Phase", "", "=== STATUS TRIGGERS ===", "Harvest Ready", "Stress Detected", "Strong Recovery", "Growth On Track", "Germination Complete", "Germination Started", "No Trigger", "", "=== CLOUD COVERAGE ===", "Clear View (fields)", "Partial Coverage (fields)", "No Image Available (fields)", "Clear View (acres)", "Partial Coverage (acres)", "No Image Available (acres)", "", "Total Fields", "Total Acreage" ), Acreage = c( NA, round(germination_acreage, 2), round(tillering_acreage, 2), round(grand_growth_acreage, 2), round(maturation_acreage, 2), round(unknown_phase_acreage, 2), NA, NA, round(harvest_ready_acreage, 2), round(stress_acreage, 2), round(recovery_acreage, 2), round(growth_on_track_acreage, 2), round(germination_complete_acreage, 2), round(germination_started_acreage, 2), round(no_trigger_acreage, 2), NA, NA, clear_fields, partial_fields, no_image_fields, round(clear_acreage, 2), round(partial_acreage, 2), round(no_image_acreage, 2), NA, total_fields, round(total_acreage, 2) ), stringsAsFactors = FALSE ) # Add metadata as attributes attr(summary_df, "cloud_fields_clear") <- clear_fields attr(summary_df, "cloud_fields_partial") <- partial_fields attr(summary_df, "cloud_fields_no_image") <- no_image_fields attr(summary_df, "cloud_fields_total") <- total_fields return(summary_df) } # ============================================================================ # EXPORT FUNCTIONS # ============================================================================ export_field_analysis_excel <- function(field_df, summary_df, project_dir, current_week, reports_dir) { message("Exporting per-field analysis to Excel and RDS...") # Save to kpis/field_analysis subfolder output_subdir <- file.path(reports_dir, "kpis", "field_analysis") if (!dir.exists(output_subdir)) { dir.create(output_subdir, recursive = TRUE) } # Create Excel with two sheets excel_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", current_week), ".xlsx") excel_path <- file.path(output_subdir, excel_filename) excel_path <- normalizePath(excel_path, winslash = "\\", mustWork = FALSE) sheets <- list( "Field Data" = field_df, "Summary" = summary_df ) write_xlsx(sheets, excel_path) message(paste("✓ Field analysis Excel exported to:", excel_path)) # Also save as RDS kpi_data <- list( field_analysis = field_df, field_analysis_summary = summary_df, metadata = list( week = current_week, export_date = Sys.Date() ) ) rds_filename <- paste0(project_dir, "_kpi_summary_tables_week", sprintf("%02d", current_week), ".rds") rds_path <- file.path(reports_dir, "kpis", rds_filename) saveRDS(kpi_data, rds_path) message(paste("✓ Field analysis RDS exported to:", rds_path)) # Also export as CSV for field history tracking csv_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", current_week), ".csv") csv_path <- file.path(output_subdir, csv_filename) write_csv(field_df, csv_path) message(paste("✓ Field analysis CSV exported to:", csv_path)) return(list(excel = excel_path, rds = rds_path, csv = csv_path)) } # ============================================================================ # MAIN # ============================================================================ main <- function() { args <- commandArgs(trailingOnly = TRUE) end_date <- if (length(args) >= 1 && !is.na(args[1])) { as.Date(args[1]) } else if (exists("end_date_str", envir = .GlobalEnv)) { as.Date(get("end_date_str", envir = .GlobalEnv)) } else { Sys.Date() } project_dir <- if (length(args) >= 2 && !is.na(args[2])) { as.character(args[2]) } else if (exists("project_dir", envir = .GlobalEnv)) { get("project_dir", envir = .GlobalEnv) } else { "angata" } # IMPORTANT: Assign project_dir BEFORE sourcing parameters_project.R # so that initialize_project() can access it via exists("project_dir") assign("project_dir", project_dir, envir = .GlobalEnv) # Load utilities and configuration (in this order - crop_messaging_utils before parameters) source(here("r_app", "crop_messaging_utils.R")) source(here("r_app", "parameters_project.R")) message("=== FIELD ANALYSIS WEEKLY (TILE-AWARE, PARALLEL) ===") message(paste("Date:", end_date)) message(paste("Project:", project_dir)) # Calculate weeks current_week <- as.numeric(format(end_date, "%V")) year <- as.numeric(format(end_date, "%Y")) previous_week <- current_week - 1 if (previous_week < 1) previous_week <- 52 message(paste("Week:", current_week, "/ Year:", year)) # Build tile grid from available tiles message("Building tile grid from available weekly tiles...") tile_grid <- build_tile_grid(weekly_tile_max, current_week, year) message(paste(" Found", nrow(tile_grid), "tiles for analysis")) # Load field boundaries tryCatch({ boundaries_result <- load_field_boundaries(data_dir) # load_field_boundaries returns a list with field_boundaries_sf and field_boundaries if (is.list(boundaries_result) && "field_boundaries_sf" %in% names(boundaries_result)) { field_boundaries_sf <- boundaries_result$field_boundaries_sf } else { field_boundaries_sf <- boundaries_result } # Check if field_boundaries_sf is valid if (!is.data.frame(field_boundaries_sf) && !inherits(field_boundaries_sf, "sf")) { stop("Field boundaries is not a valid sf object or data frame") } if (nrow(field_boundaries_sf) == 0) { stop("Field boundaries loaded but contains 0 rows") } message(paste(" Loaded", nrow(field_boundaries_sf), "fields from boundaries")) }, error = function(e) { stop("ERROR loading field boundaries: ", e$message, "\nCheck that pivot.geojson exists in ", data_dir) }) # Load previous week data for phase tracking previous_week_result <- load_previous_week_csv(project_dir, current_week, reports_dir) previous_week_csv <- if (previous_week_result$found) previous_week_result$data else NULL # Load planting dates planting_dates <- extract_planting_dates(harvesting_data) # === PARALLEL PROCESSING SETUP === message("Setting up parallel processing...") # Check if future is already planned current_plan <- class(future::plan())[1] if (current_plan == "sequential") { # Default to multisession with auto-detected workers num_workers <- parallel::detectCores() - 1 message(paste(" Using", num_workers, "workers for parallel processing")) future::plan(future::multisession, workers = num_workers) } else { message(paste(" Using existing plan:", current_plan)) } # === PARALLEL FIELD ANALYSIS === message("Analyzing fields in parallel...") # Map over all fields using furrr (parallel version of map) field_analysis_list <- furrr::future_map( seq_len(nrow(field_boundaries_sf)), ~ analyze_single_field( field_idx = ., field_boundaries_sf = field_boundaries_sf, tile_grid = tile_grid, week_num = current_week, year = year, mosaic_dir = weekly_tile_max, previous_week_csv = previous_week_csv, planting_dates = planting_dates, report_date = end_date, harvest_imminence_data = NULL # Optional: add if available ), .progress = TRUE, .options = furrr::furrr_options(seed = TRUE) ) # Bind list of data frames into single data frame field_analysis_df <- dplyr::bind_rows(field_analysis_list) if (nrow(field_analysis_df) == 0) { stop("No fields analyzed successfully!") } message(paste("✓ Analyzed", nrow(field_analysis_df), "fields")) # Generate summary summary_statistics_df <- generate_field_analysis_summary(field_analysis_df) # Export results export_paths <- export_field_analysis_excel( field_analysis_df, summary_statistics_df, project_dir, current_week, reports_dir ) # Print summary cat("\n=== FIELD ANALYSIS SUMMARY ===\n") cat("Fields analyzed:", nrow(field_analysis_df), "\n") cat("Excel export:", export_paths$excel, "\n") cat("RDS export:", export_paths$rds, "\n") cat("CSV export:", export_paths$csv, "\n\n") cat("--- Per-field results (first 10) ---\n") # Select only columns that exist to avoid print errors available_cols <- c("Field_id", "Acreage", "Age_week", "CI_mean", "Status_trigger", "Cloud_category") available_cols <- available_cols[available_cols %in% names(field_analysis_df)] if (length(available_cols) > 0) { print(head(field_analysis_df[, available_cols], 10)) } else { print(head(field_analysis_df, 10)) } cat("\n--- Summary statistics ---\n") print(summary_statistics_df) message("\n✓ Field analysis complete!") } if (sys.nframe() == 0) { main() }