SmartCane/r_app/run_full_pipeline.R

939 lines
39 KiB
R

# ==============================================================================
# FULL PIPELINE RUNNER
# ==============================================================================
# Mixed Python/R pipeline:
# 1. Python: Download Planet images
# 2. R 10: Create master grid and split TIFFs
# 3. R 20: CI Extraction
# 4. R 21: Convert CI RDS to CSV
# 5. R 30: Interpolate growth model
# 6. Python 31: Harvest imminent weekly
# 7. R 40: Mosaic creation
# 8. R 80: Calculate KPIs
#
# ==============================================================================
# HOW TO RUN THIS SCRIPT
# ==============================================================================
#
# Run from the smartcane/ directory:
#
# Option 1 (Recommended - shows real-time output):
# Rscript r_app/run_full_pipeline.R
#
# Option 2 (Full path to Rscript - use & in PowerShell for paths with spaces):
# & "C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe" r_app/run_full_pipeline.R
#
# Option 3 (Batch mode - output saved to .Rout file):
# R CMD BATCH --vanilla r_app/run_full_pipeline.R
#
# ==============================================================================
# ==============================================================================
# *** EDIT THESE VARIABLES ***
end_date <- as.Date("2026-01-27") # or specify: as.Date("2026-01-27") , Sys.Date()
project_dir <- "aura" # project name: "esa", "aura", "angata", "chemba"
data_source <- "merged_tif" # Standard data source directory
force_rerun <- FALSE # Set to TRUE to force all scripts to run even if outputs exist
# ***************************
# Define Rscript path for running external R scripts via system()
RSCRIPT_PATH <- file.path("C:", "Program Files", "R", "R-4.4.3", "bin", "x64", "Rscript.exe")
# Load client type mapping and centralized paths from parameters_project.R
source("r_app/parameters_project.R")
source("r_app/00_common_utils.R")
paths <- setup_project_directories(project_dir)
client_type <- get_client_type(project_dir)
cat(sprintf("\nProject: %s → Client Type: %s\n", project_dir, client_type))
# ==============================================================================
# DETERMINE REPORTING WINDOW (auto-calculated based on KPI requirements)
# ==============================================================================
# Script 80 (KPIs) needs N weeks of historical data for trend analysis and reporting
# We calculate this automatically based on client type
reporting_weeks_needed <- 8 # CRITICAL: Need 8 weeks for 8-week trend analysis (Script 80 requirement)
offset <- reporting_weeks_needed * 7 # Convert weeks to days (8 weeks = 56 days)
cat(sprintf("\n[INFO] Reporting window: %d weeks (%d days of data)\n", reporting_weeks_needed, offset))
wwy_current <- get_iso_week_year(end_date)
cat(sprintf(" Running week: %02d / %d\n", wwy_current$week, wwy_current$year))
cat(sprintf(" Date range: %s to %s\n", format(end_date - offset, "%Y-%m-%d"), format(end_date, "%Y-%m-%d")))
# Format dates
end_date_str <- format(as.Date(end_date), "%Y-%m-%d")
# Track success of pipeline
pipeline_success <- TRUE
# ==============================================================================
# EARLY PREREQ CHECK: Verify mosaic requirements BEFORE any downloads
# ==============================================================================
# This determines if we need more weeks of data than the initial reporting window
# Run this BEFORE downloads so we can download ONLY missing dates upfront
cat("\n========== EARLY CHECK: MOSAIC REQUIREMENTS FOR REPORTING WINDOW ==========\n")
# Detect mosaic mode early (centralized function in parameters_project.R)
mosaic_mode <- detect_mosaic_mode(project_dir)
# Check what mosaics we NEED
weeks_needed <- data.frame()
for (weeks_back in 0:(reporting_weeks_needed - 1)) {
check_date <- end_date - (weeks_back * 7)
wwy <- get_iso_week_year(check_date)
weeks_needed <- rbind(weeks_needed, data.frame(week = wwy$week, year = wwy$year, date = check_date))
}
missing_weeks_dates <- c() # Will store the earliest date of missing weeks
earliest_missing_date <- end_date # Start with end_date, go back if needed
missing_weeks <- data.frame() # Track ALL missing weeks for later processing by Script 40
for (i in 1:nrow(weeks_needed)) {
week_num <- weeks_needed[i, "week"]
year_num <- weeks_needed[i, "year"]
check_date <- weeks_needed[i, "date"]
# Pattern must be flexible to match both:
# - Single-file: week_51_2025.tif (top-level)
# - Single-file per-field: week_51_2025.tif (in {FIELD}/ subdirectories)
# - Tiled: week_51_2025_01.tif, week_51_2025_02.tif, etc.
week_pattern_check <- sprintf("week_%02d_%d", week_num, year_num)
files_this_week <- c()
if (mosaic_mode == "tiled") {
mosaic_dir_check <- get_mosaic_dir(project_dir, mosaic_mode = "tiled")
if (dir.exists(mosaic_dir_check)) {
# NEW: Support per-field architecture - search recursively for mosaics in field subdirectories
files_this_week <- list.files(mosaic_dir_check, pattern = week_pattern_check, recursive = TRUE, full.names = FALSE)
}
} else if (mosaic_mode == "single-file") {
mosaic_dir_check <- paths$weekly_mosaic_dir
if (dir.exists(mosaic_dir_check)) {
# NEW: Support per-field architecture - search recursively for mosaics in field subdirectories
# Check both top-level (legacy) and field subdirectories (per-field architecture)
files_this_week <- list.files(mosaic_dir_check, pattern = week_pattern_check, recursive = TRUE, full.names = FALSE)
}
}
cat(sprintf(
" Week %02d/%d (%s): %s\n", week_num, year_num, format(check_date, "%Y-%m-%d"),
if (length(files_this_week) > 0) "✓ EXISTS" else "✗ MISSING"
))
# If week is missing, track its date range for downloading/processing
if (length(files_this_week) == 0) {
week_start <- check_date - 6 # Monday of that week
if (week_start < earliest_missing_date) {
earliest_missing_date <- week_start
}
# Add to missing_weeks dataframe - Script 40 will process these
missing_weeks <- rbind(missing_weeks, data.frame(week = week_num, year = year_num, week_end_date = check_date))
}
}
# Calculate dynamic offset for preprocessing: only process from earliest missing week to end_date
if (earliest_missing_date < end_date) {
cat(sprintf("\n[INFO] Missing week(s) detected - need to fill from %s onwards\n", format(earliest_missing_date, "%Y-%m-%d")))
# Adjust offset to cover only the gap (from earliest missing week to end_date)
dynamic_offset <- as.numeric(end_date - earliest_missing_date)
cat(sprintf(
"[INFO] Will download/process ONLY missing dates: %d days (from %s to %s)\n",
dynamic_offset, format(earliest_missing_date, "%Y-%m-%d"), format(end_date, "%Y-%m-%d")
))
# Use dynamic offset for data generation scripts (10, 20, 30, 40)
# But Script 80 still uses full reporting_weeks_needed offset for KPI calculations
data_generation_offset <- dynamic_offset
force_data_generation <- TRUE
} else {
cat("\n[INFO] ✓ All required mosaics exist - using normal reporting window\n")
data_generation_offset <- offset # Use default reporting window offset
force_data_generation <- FALSE
}
# ==============================================================================
# CHECK KPI REQUIREMENTS FOR REPORTING WINDOW
# ==============================================================================
# Scripts 90 (Word report) and 91 (Excel report) require KPIs for full reporting window
# Script 80 ALWAYS runs and will CALCULATE missing KPIs, so this is just for visibility
# Uses centralized check_kpi_completeness() function from parameters_project.R
cat("\n========== KPI REQUIREMENT CHECK ==========\n")
cat(sprintf(
"KPIs needed for reporting: %d weeks (current week + %d weeks history)\n",
reporting_weeks_needed, reporting_weeks_needed - 1
))
# Check KPI completeness (replaces duplicate logic from lines ~228-270 and ~786-810)
kpi_check <- check_kpi_completeness(project_dir, client_type, end_date, reporting_weeks_needed)
kpi_dir <- kpi_check$kpi_dir
kpis_needed <- kpi_check$kpis_df
kpis_missing_count <- kpi_check$missing_count
# Create KPI directory if it doesn't exist
if (!dir.exists(kpi_dir)) {
dir.create(kpi_dir, recursive = TRUE, showWarnings = FALSE)
}
# Display status for each week
if (nrow(kpis_needed) > 0) {
for (i in 1:nrow(kpis_needed)) {
row <- kpis_needed[i, ]
cat(sprintf(
" Week %02d/%d (%s): %s (%d files)\n",
row$week, row$year, format(row$date, "%Y-%m-%d"),
if (row$has_kpis) "✓ EXISTS" else "✗ WILL BE CALCULATED",
row$file_count
))
}
} else {
cat(" (No weeks in reporting window)\n")
}
cat(sprintf(
"\nKPI Summary: %d/%d weeks exist, %d week(s) will be calculated by Script 80\n",
nrow(kpis_needed) - kpis_missing_count, nrow(kpis_needed), kpis_missing_count
))
# Define conditional script execution based on client type
# Client types:
# - "cane_supply": Runs Scripts 20,21,22,23,30,31,80,91 (full pipeline with Excel output)
# - "agronomic_support": Runs Scripts 20,30,80,90 only (KPI calculation + Word report)
#
# Scripts that ALWAYS run (regardless of client type):
# - 00: Python Download
# - 10: Tiling (if outputs don't exist)
# - 20: CI Extraction
# - 30: Growth Model
# - 40: Mosaic Creation
# - 80: KPI Calculation
#
# Scripts that are client-type specific:
# - 21: CI RDS→CSV (cane_supply only)
# - 22: (cane_supply only)
# - 23: (cane_supply only)
# - 31: Harvest Imminent (cane_supply only)
# - 90: Legacy Word Report (agronomic_support only)
# - 91: Modern Excel Report (cane_supply only)
skip_cane_supply_only <- (client_type != "cane_supply") # Skip Scripts 21,22,23,31 for non-cane_supply
run_legacy_report <- (client_type == "agronomic_support") # Script 90 for agronomic support
run_modern_report <- (client_type == "cane_supply") # Script 91 for cane supply
# ==============================================================================
# INTELLIGENT CHECKING: What has already been completed?
# ==============================================================================
cat("\n========== CHECKING EXISTING OUTPUTS ==========\n")
# Use centralized mosaic mode detection from parameters_project.R
cat(sprintf("Auto-detected mosaic mode: %s\n", mosaic_mode))
# Check Script 10 outputs - FLEXIBLE: look for tiles either directly OR in grid subdirs
tiles_split_base <- paths$daily_tiles_split_dir
tiles_dates <- c()
if (dir.exists(tiles_split_base)) {
# Try grid-size subdirectories first (5x5, 10x10, etc.) - preferred new structure
subfolders <- list.dirs(tiles_split_base, full.names = FALSE, recursive = FALSE)
grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE)
if (length(grid_patterns) > 0) {
# New structure: daily_tiles_split/{grid_size}/{dates}/
grid_dir <- file.path(tiles_split_base, grid_patterns[1])
tiles_dates <- list.dirs(grid_dir, full.names = FALSE, recursive = FALSE)
} else {
# Old structure: daily_tiles_split/{dates}/ (no grid-size subfolder)
tiles_dates <- list.dirs(tiles_split_base, full.names = FALSE, recursive = FALSE)
}
}
cat(sprintf("Script 10: %d dates already tiled\n", length(tiles_dates)))
# Check Script 20 outputs (CI extraction) - daily RDS files
ci_daily_dir <- paths$daily_ci_vals_dir
ci_files <- if (dir.exists(ci_daily_dir)) {
list.files(ci_daily_dir, pattern = "\\.rds$")
} else {
c()
}
cat(sprintf("Script 20: %d CI daily RDS files exist\n", length(ci_files)))
# Check Script 21 outputs (CSV conversion) - note: this gets overwritten each time, so we don't skip based on this
# Instead, check if CI RDS files exist - if they do, 21 should also run
# For now, just note that CSV is time-dependent, not a good skip indicator
cat("Script 21: CSV file exists but gets overwritten - will run if Script 20 runs\n")
# Check Script 40 outputs (mosaics) - check which weeks are missing (not just current week)
# The early check section already identified missing_weeks, so we use that
skip_40 <- (nrow(missing_weeks) == 0 && !force_rerun) # Only skip if NO missing weeks AND not forcing rerun
cat(sprintf("Script 40: %d missing week(s) to create\n", nrow(missing_weeks)))
# Check Script 80 outputs (KPIs in reports/kpis/{field_level|field_analysis})
# kpi_dir already set by check_kpi_completeness() above
# Script 80 exports to .xlsx (Excel) and .rds (RDS) formats
kpi_files <- if (dir.exists(kpi_dir)) {
list.files(kpi_dir, pattern = "\\.xlsx$|\\.rds$")
} else {
c()
}
cat(sprintf("Script 80: %d KPI files exist\n", length(kpi_files)))
# Determine if scripts should run based on outputs AND client type
skip_10 <- (length(tiles_dates) > 0 && !force_rerun && !force_data_generation) # Force Script 10 if missing weeks detected
skip_20 <- FALSE # Script 20 ALWAYS runs for all client types - processes new downloaded data
skip_21 <- skip_cane_supply_only # Script 21 runs ONLY for cane_supply clients (CI→CSV conversion)
skip_22 <- skip_cane_supply_only # Script 22 runs ONLY for cane_supply clients
skip_23 <- skip_cane_supply_only # Script 23 runs ONLY for cane_supply clients
skip_30 <- FALSE # Script 30 ALWAYS runs for all client types
skip_31 <- skip_cane_supply_only # Script 31 runs ONLY for cane_supply clients
skip_40 <- (nrow(missing_weeks) == 0 && !force_rerun) # Skip Script 40 only if NO missing weeks
skip_80 <- (kpis_missing_count == 0 && !force_rerun) # Skip Script 80 only if ALL KPIs exist AND not forcing rerun
cat("\nSkipping decisions (based on outputs AND client type):\n")
cat(sprintf(" Script 10: %s\n", if (skip_10) "SKIP" else "RUN"))
cat(sprintf(" Script 20: RUN (always runs to process new downloads)\n"))
cat(sprintf(" Script 21: %s %s\n", if (skip_21) "SKIP" else "RUN", if (skip_cane_supply_only && !skip_21) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 22: %s %s\n", if (skip_22) "SKIP" else "RUN", if (skip_cane_supply_only) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 23: %s %s\n", if (skip_23) "SKIP" else "RUN", if (skip_cane_supply_only) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 30: %s (always runs)\n", if (skip_30) "SKIP" else "RUN"))
cat(sprintf(" Script 31: %s %s\n", if (skip_31) "SKIP" else "RUN", if (skip_cane_supply_only) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 40: %s (looping through %d missing weeks)\n", if (skip_40) "SKIP" else "RUN", nrow(missing_weeks)))
cat(sprintf(" Script 80: %s (always runs)\n", if (skip_80) "SKIP" else "RUN"))
cat(sprintf(" Script 90: %s %s\n", if (!run_legacy_report) "SKIP" else "RUN", if (run_legacy_report) "(agronomic_support legacy report)" else ""))
cat(sprintf(" Script 91: %s %s\n", if (!run_modern_report) "SKIP" else "RUN", if (run_modern_report) "(cane_supply modern report)" else ""))
# ==============================================================================
# PYTHON: DOWNLOAD PLANET IMAGES (MISSING DATES ONLY)
# ==============================================================================
cat("\n========== DOWNLOADING PLANET IMAGES (MISSING DATES ONLY) ==========\n")
tryCatch(
{
# Setup paths
# NOTE: All downloads go to merged_tif/ regardless of project
# (data_source variable is used later by Script 20 for reading, but downloads always go to merged_tif)
merged_tifs_dir <- paths$merged_tif_folder # Always check merged_tif for downloads
cat(sprintf("[DEBUG] Checking for existing files in: %s\n", merged_tifs_dir))
cat(sprintf("[DEBUG] Directory exists: %s\n", dir.exists(merged_tifs_dir)))
# Get existing dates from raw TIFFs in merged_tif/
existing_tiff_files <- list.files(merged_tifs_dir, pattern = "^\\d{4}-\\d{2}-\\d{2}\\.tif$")
existing_tiff_dates <- sub("\\.tif$", "", existing_tiff_files)
cat(sprintf("[DEBUG] Found %d existing TIFF files\n", length(existing_tiff_files)))
if (length(existing_tiff_files) > 0) {
cat(sprintf("[DEBUG] Sample files: %s\n", paste(head(existing_tiff_files, 3), collapse=", ")))
}
# Find missing dates in the window
start_date <- end_date - data_generation_offset
date_seq <- seq(start_date, end_date, by = "day")
target_dates <- format(date_seq, "%Y-%m-%d")
# Get existing dates from tiles (better indicator of completion for tiled projects)
existing_tile_dates <- tiles_dates
# CRITICAL FIX: Always use TIFF dates for checking existing files
# This is the source of truth - if merged_tif/ has a file, don't re-download it
# We don't download again if the file exists, regardless of whether tiles have been created yet
if (length(existing_tiff_dates) > 0) {
cat(sprintf("[DEBUG] Using TIFF dates for existence check (found %d existing files)\n", length(existing_tiff_dates)))
# IMPORTANT: Only consider existing TIFF dates that fall within our target window
# This prevents old 2025 data from masking missing 2026 data
existing_tile_dates <- existing_tiff_dates[existing_tiff_dates %in% target_dates]
}
# Only download if files don't exist yet (tiles for tiled projects, TIFFs for single-file)
missing_dates <- target_dates[!(target_dates %in% existing_tile_dates)]
if (mosaic_mode == "single-file") {
cat(sprintf(" Existing TIFF dates: %d\n", length(existing_tile_dates)))
} else {
cat(sprintf(" Existing tiled dates: %d\n", length(existing_tile_dates)))
}
cat(sprintf(" Missing dates in window: %d\n", length(missing_dates)))
# Download each missing date
download_count <- 0
download_failed <- 0
if (length(missing_dates) > 0) {
# Save current directory
original_dir <- getwd()
# Change to python_app directory so relative paths work correctly
setwd("python_app")
for (date_str in missing_dates) {
cmd <- sprintf('python 00_download_8band_pu_optimized.py "%s" --date "%s" --resolution 3 --cleanup', project_dir, date_str)
result <- system(cmd, ignore.stdout = FALSE, ignore.stderr = FALSE)
if (result == 0) {
download_count <- download_count + 1
} else {
download_failed <- download_failed + 1
}
}
# Change back to original directory
setwd(original_dir)
}
cat(sprintf("✓ Downloaded %d dates, %d failed\n", download_count, download_failed))
if (download_failed > 0) {
cat("⚠ Some downloads failed, but continuing pipeline\n")
}
# Force Script 10 to run ONLY if downloads actually succeeded (not just attempted)
if (download_count > 0) {
skip_10 <- FALSE
}
},
error = function(e) {
cat("✗ Error in planet download:", e$message, "\n")
pipeline_success <<- FALSE
}
)
# ==============================================================================
# SCRIPT 10: CREATE PER-FIELD TIFFs
# ==============================================================================
if (pipeline_success && !skip_10) {
cat("\n========== RUNNING SCRIPT 10: CREATE PER-FIELD TIFFs ==========\n")
tryCatch(
{
# Run Script 10 via system() - NEW per-field version
# Arguments: project_dir
cmd <- sprintf(
'"%s" r_app/10_create_per_field_tiffs.R "%s"',
RSCRIPT_PATH,
project_dir
)
result <- system(cmd)
if (result != 0) {
stop("Script 10 exited with error code:", result)
}
# Verify output - check per-field structure
field_tiles_dir <- paths$field_tiles_dir
if (dir.exists(field_tiles_dir)) {
fields <- list.dirs(field_tiles_dir, full.names = FALSE, recursive = FALSE)
fields <- fields[fields != ""]
total_files <- sum(sapply(file.path(field_tiles_dir, fields), function(f) length(list.files(f, pattern = "\\.tif$"))))
cat(sprintf("✓ Script 10 completed - created per-field TIFFs (%d fields, %d files)\n", length(fields), total_files))
} else {
cat("✓ Script 10 completed\n")
}
},
error = function(e) {
cat("✗ Error in Script 10:", e$message, "\n")
pipeline_success <<- FALSE
}
)
} else if (skip_10) {
cat("\n========== SKIPPING SCRIPT 10 (per-field TIFFs already exist) ==========\n")
}
# ==============================================================================
# CHECK: Per-Field TIFFs Without CI Data
# ==============================================================================
# IMPORTANT: Script 10 creates per-field TIFFs for ALL dates in merged_tif/
# But Script 20 only processes dates within the offset window.
# This check finds dates that have per-field TIFFs but NO CI data,
# and forces Script 20 to process them regardless of offset.
cat("\n========== CHECKING FOR PER-FIELD TIFFs WITHOUT CI DATA ==========\n")
field_tiles_dir <- paths$field_tiles_dir
field_tiles_ci_dir <- paths$field_tiles_ci_dir
ci_daily_dir <- paths$daily_ci_vals_dir
# Get all dates that have per-field TIFFs
tiff_dates_all <- c()
if (dir.exists(field_tiles_dir)) {
# Check all field subdirectories
fields <- list.dirs(field_tiles_dir, full.names = FALSE, recursive = FALSE)
fields <- fields[fields != ""]
if (length(fields) > 0) {
for (field in fields) {
field_path <- file.path(field_tiles_dir, field)
# Get dates from TIFF filenames: YYYY-MM-DD_*.tif or similar
tiff_files <- list.files(field_path, pattern = "^\\d{4}-\\d{2}-\\d{2}.*\\.tif$")
dates_in_field <- unique(sub("_.*$", "", tiff_files)) # Extract YYYY-MM-DD
tiff_dates_all <- unique(c(tiff_dates_all, dates_in_field))
}
}
}
# Get all dates that have CI data (either from field_tiles_CI or extracted_ci)
ci_dates_all <- c()
if (dir.exists(field_tiles_ci_dir)) {
# Check all field subdirectories for CI TIFFs
fields_ci <- list.dirs(field_tiles_ci_dir, full.names = FALSE, recursive = FALSE)
fields_ci <- fields_ci[fields_ci != ""]
if (length(fields_ci) > 0) {
for (field in fields_ci) {
field_path <- file.path(field_tiles_ci_dir, field)
ci_tiff_files <- list.files(field_path, pattern = "^\\d{4}-\\d{2}-\\d{2}.*\\.tif$")
dates_in_field <- unique(sub("_.*$", "", ci_tiff_files))
ci_dates_all <- unique(c(ci_dates_all, dates_in_field))
}
}
}
# Also check extracted_ci RDS files as source of truth
if (dir.exists(ci_daily_dir)) {
fields_rds <- list.dirs(ci_daily_dir, full.names = FALSE, recursive = FALSE)
fields_rds <- fields_rds[fields_rds != ""]
if (length(fields_rds) > 0) {
for (field in fields_rds) {
field_path <- file.path(ci_daily_dir, field)
rds_files <- list.files(field_path, pattern = "^\\d{4}-\\d{2}-\\d{2}\\.rds$")
dates_in_field <- sub("\\.rds$", "", rds_files)
ci_dates_all <- unique(c(ci_dates_all, dates_in_field))
}
}
}
# Find dates with TIFFs but no CI data
dates_missing_ci <- setdiff(tiff_dates_all, ci_dates_all)
cat(sprintf("Total per-field TIFF dates: %d\n", length(tiff_dates_all)))
cat(sprintf("Total CI data dates: %d\n", length(ci_dates_all)))
cat(sprintf("Dates with TIFFs but NO CI: %d\n", length(dates_missing_ci)))
# If there are per-field TIFFs without CI, force Script 20 to run with extended date range
if (length(dates_missing_ci) > 0) {
cat("\n⚠ Found per-field TIFFs without CI data - forcing Script 20 to process them\n")
cat(sprintf(" Sample missing dates: %s\n", paste(head(dates_missing_ci, 3), collapse=", ")))
# Calculate extended date range: from earliest missing date to end_date
earliest_missing_tiff <- min(as.Date(dates_missing_ci))
extended_offset <- as.numeric(end_date - earliest_missing_tiff)
cat(sprintf(" Extended offset: %d days (from %s to %s)\n",
extended_offset, format(earliest_missing_tiff, "%Y-%m-%d"), format(end_date, "%Y-%m-%d")))
# Use extended offset for Script 20
offset_for_ci <- extended_offset
skip_20 <- FALSE # Force Script 20 to run
} else {
cat("✓ All per-field TIFFs have corresponding CI data\n")
offset_for_ci <- offset # Use normal offset
}
# ==============================================================================
# SCRIPT 20: CI EXTRACTION
# ==============================================================================
if (pipeline_success && !skip_20) {
cat("\n========== RUNNING SCRIPT 20: CI EXTRACTION ==========\n")
tryCatch(
{
# Run Script 20 via system() to pass command-line args just like from terminal
# Arguments: project_dir end_date offset
# Use offset_for_ci which may have been extended if per-field TIFFs exist without CI
cmd <- sprintf(
'"%s" r_app/20_ci_extraction_per_field.R "%s" "%s" %d',
RSCRIPT_PATH,
project_dir, format(end_date, "%Y-%m-%d"), offset_for_ci
)
result <- system(cmd)
if (result != 0) {
stop("Script 20 exited with error code:", result)
}
# Verify CI output was created
ci_daily_dir <- paths$daily_ci_vals_dir
if (dir.exists(ci_daily_dir)) {
files <- list.files(ci_daily_dir, pattern = "\\.rds$")
cat(sprintf("✓ Script 20 completed - generated %d CI files\n", length(files)))
} else {
cat("✓ Script 20 completed\n")
}
},
error = function(e) {
cat("✗ Error in Script 20:", e$message, "\n")
pipeline_success <<- FALSE
}
)
} else if (skip_20) {
cat("\n========== SKIPPING SCRIPT 20 (CI already extracted) ==========\n")
}
# ==============================================================================
# SCRIPT 21: CONVERT CI RDS TO CSV
# ==============================================================================
if (pipeline_success && !skip_21) {
cat("\n========== RUNNING SCRIPT 21: CONVERT CI RDS TO CSV ==========\n")
tryCatch(
{
# Set environment variables for the script
assign("end_date", end_date, envir = .GlobalEnv)
assign("offset", offset, envir = .GlobalEnv)
assign("project_dir", project_dir, envir = .GlobalEnv)
source("r_app/21_convert_ci_rds_to_csv.R")
main() # Call main() to execute the script with the environment variables
# Verify CSV output was created
ci_csv_path <- paths$ci_for_python_dir
if (dir.exists(ci_csv_path)) {
csv_files <- list.files(ci_csv_path, pattern = "\\.csv$")
cat(sprintf("✓ Script 21 completed - converted to %d CSV files\n", length(csv_files)))
} else {
cat("✓ Script 21 completed\n")
}
},
error = function(e) {
cat("✗ Error in Script 21:", e$message, "\n")
pipeline_success <<- FALSE
}
)
} else if (skip_21) {
cat("\n========== SKIPPING SCRIPT 21 (CSV already created) ==========\n")
}
# ==============================================================================
# SCRIPT 30: INTERPOLATE GROWTH MODEL
# ==============================================================================
if (pipeline_success && !skip_30) {
cat("\n========== RUNNING SCRIPT 30: INTERPOLATE GROWTH MODEL ==========\n")
tryCatch(
{
# Run Script 30 via system() to pass command-line args just like from terminal
# Script 30 expects: project_dir only
# Per-field version reads CI data from Script 20 per-field output location
cmd <- sprintf(
'"%s" r_app/30_interpolate_growth_model.R "%s"',
RSCRIPT_PATH,
project_dir
)
result <- system(cmd)
if (result != 0) {
stop("Script 30 exited with error code:", result)
}
# Verify interpolated output - Script 30 saves to cumulative_ci_vals_dir
cumulative_ci_vals_dir <- paths$cumulative_ci_vals_dir
if (dir.exists(cumulative_ci_vals_dir)) {
files <- list.files(cumulative_ci_vals_dir, pattern = "\\.rds$")
cat(sprintf("✓ Script 30 completed - generated %d interpolated RDS file(s)\n", length(files)))
} else {
cat("✓ Script 30 completed\n")
}
},
error = function(e) {
cat("✗ Error in Script 30:", e$message, "\n")
pipeline_success <<- FALSE
}
)
}
# ==============================================================================
# PYTHON 31: HARVEST IMMINENT WEEKLY
# ==============================================================================
if (pipeline_success && !skip_31) {
cat("\n========== RUNNING PYTHON 31: HARVEST IMMINENT WEEKLY ==========\n")
tryCatch(
{
# Run Python script in pytorch_gpu conda environment
# Script expects positional project name (not --project flag)
# Run from smartcane root so conda can find the environment
cmd <- sprintf("conda run -n pytorch_gpu python python_app/31_harvest_imminent_weekly.py %s", project_dir)
result <- system(cmd)
if (result == 0) {
# Verify harvest output - check for THIS WEEK's specific file
wwy_current_31 <- get_iso_week_year(end_date)
harvest_exists <- check_harvest_output_exists(project_dir, wwy_current_31$week, wwy_current_31$year)
if (harvest_exists) {
cat(sprintf("✓ Script 31 completed - generated harvest imminent file for week %02d\n", wwy_current_31$week))
} else {
cat("✓ Script 31 completed (check if harvest.xlsx is available)\n")
}
} else {
cat("⚠ Script 31 completed with errors (check harvest.xlsx availability)\n")
}
},
error = function(e) {
setwd(original_dir)
cat("⚠ Script 31 error:", e$message, "\n")
}
)
} else if (skip_31) {
cat("\n========== SKIPPING SCRIPT 31 (non-cane_supply client type) ==========\n")
}
# ==============================================================================
# SCRIPT 40: MOSAIC CREATION (LOOP THROUGH MISSING WEEKS)
# ==============================================================================
if (pipeline_success && !skip_40) {
cat("\n========== RUNNING SCRIPT 40: MOSAIC CREATION ==========\n")
# If there are missing weeks, process them one at a time
if (nrow(missing_weeks) > 0) {
cat(sprintf("Found %d missing week(s) - running Script 40 once per week\n\n", nrow(missing_weeks)))
# Loop through missing weeks in reverse chronological order (oldest first)
for (week_idx in nrow(missing_weeks):1) {
missing_week <- missing_weeks[week_idx, ]
week_num <- missing_week$week
year_num <- missing_week$year
week_end_date <- as.Date(missing_week$week_end_date)
cat(sprintf(
"--- Creating mosaic for week %02d/%d (ending %s) ---\n",
week_num, year_num, format(week_end_date, "%Y-%m-%d")
))
tryCatch(
{
# Run Script 40 with offset=7 (one week only) for this specific week
# The end_date is the last day of the week, and offset=7 covers the full 7-day week
# Arguments: end_date offset project_dir
cmd <- sprintf(
'"%s" r_app/40_mosaic_creation_per_field.R "%s" 7 "%s"',
RSCRIPT_PATH,
format(week_end_date, "%Y-%m-%d"), project_dir
)
result <- system(cmd)
if (result != 0) {
stop("Script 40 exited with error code:", result)
}
# Verify mosaic was created for this specific week (centralized helper function)
mosaic_check <- check_mosaic_exists(project_dir, week_num, year_num, mosaic_mode)
mosaic_created <- mosaic_check$created
if (mosaic_created) {
cat(sprintf("✓ Week %02d/%d mosaic created successfully\n\n", week_num, year_num))
} else {
cat(sprintf("✓ Week %02d/%d processing completed (verify output)\n\n", week_num, year_num))
}
},
error = function(e) {
cat(sprintf("✗ Error creating mosaic for week %02d/%d: %s\n", week_num, year_num, e$message), "\n")
pipeline_success <<- FALSE
}
)
}
if (pipeline_success) {
cat(sprintf("✓ Script 40 completed - created all %d missing week mosaics\n", nrow(missing_weeks)))
}
} else {
cat("No missing weeks detected - skipping Script 40\n")
skip_40 <- TRUE
}
} else if (skip_40) {
cat("\n========== SKIPPING SCRIPT 40 (mosaics already created) ==========\n")
}
# ==============================================================================
# SCRIPT 80: CALCULATE KPIs (LOOP THROUGH REPORTING WINDOW)
# ==============================================================================
if (pipeline_success && !skip_80) {
cat("\n========== RUNNING SCRIPT 80: CALCULATE KPIs FOR REPORTING WINDOW ==========\n")
# Build list of weeks that NEED calculation (missing KPIs)
weeks_to_calculate <- kpis_needed[!kpis_needed$has_kpis, ] # Only weeks WITHOUT KPIs
if (nrow(weeks_to_calculate) > 0) {
# Sort by date (oldest to newest) for sequential processing
weeks_to_calculate <- weeks_to_calculate[order(weeks_to_calculate$date), ]
cat(sprintf(
"Looping through %d missing week(s) in reporting window (from %s back to %s):\n\n",
nrow(weeks_to_calculate),
format(max(weeks_to_calculate$date), "%Y-%m-%d"),
format(min(weeks_to_calculate$date), "%Y-%m-%d")
))
tryCatch(
{
for (week_idx in 1:nrow(weeks_to_calculate)) {
week_row <- weeks_to_calculate[week_idx, ]
calc_date <- week_row$date
# Run Script 80 for this specific week with offset=7 (one week only)
# This ensures Script 80 calculates KPIs for THIS week with proper trend data
cmd <- sprintf(
'"%s" r_app/80_calculate_kpis.R "%s" "%s" %d',
RSCRIPT_PATH,
format(calc_date, "%Y-%m-%d"), project_dir, 7
) # offset=7 for single week
cat(sprintf(
" [Week %02d/%d] Running Script 80 with end_date=%s...\n",
week_row$week, week_row$year, format(calc_date, "%Y-%m-%d")
))
result <- system(cmd, ignore.stdout = FALSE, ignore.stderr = FALSE)
if (result == 0) {
cat(sprintf(" ✓ KPIs calculated for week %02d/%d\n", week_row$week, week_row$year))
} else {
cat(sprintf(
" ✗ Error calculating KPIs for week %02d/%d (exit code: %d)\n",
week_row$week, week_row$year, result
))
}
}
# Verify total KPI output (kpi_dir defined by check_kpi_completeness() earlier)
if (dir.exists(kpi_dir)) {
files <- list.files(kpi_dir, pattern = "\\.xlsx$|\\.rds$")
# Extract subdir name from kpi_dir path for display
subdir_name <- basename(kpi_dir)
cat(sprintf("\n✓ Script 80 loop completed - total %d KPI files in %s/\n", length(files), subdir_name))
} else {
cat("\n✓ Script 80 loop completed\n")
}
},
error = function(e) {
cat("✗ Error in Script 80 loop:", e$message, "\n")
pipeline_success <<- FALSE
}
)
} else {
cat(sprintf("✓ All %d weeks already have KPIs - skipping calculation\n", nrow(kpis_needed)))
}
} else if (skip_80) {
cat("\n========== SKIPPING SCRIPT 80 (all KPIs already exist) ==========\n")
}
# ==============================================================================
# VERIFY KPI COMPLETION AFTER SCRIPT 80
# ==============================================================================
# Recheck if all KPIs are now available (Script 80 should have calculated any missing ones)
cat("\n========== VERIFYING KPI COMPLETION ==========\n")
kpis_complete <- TRUE
if (dir.exists(kpi_dir)) {
for (weeks_back in 0:(reporting_weeks_needed - 1)) {
check_date <- end_date - (weeks_back * 7)
week_num <- as.numeric(format(check_date, "%V"))
year_num <- as.numeric(format(check_date, "%G"))
# Check for any KPI file from that week (flexible pattern to match all formats)
# Matches: week_05_2026, AURA_KPI_week_05_2026, etc.
week_pattern <- sprintf("_week_%02d_%d|week_%02d_%d", week_num, year_num, week_num, year_num)
# NEW: Support per-field architecture - search recursively for KPI files in field subdirectories
kpi_files_this_week <- list.files(kpi_dir, pattern = week_pattern, recursive = TRUE, full.names = FALSE)
if (length(kpi_files_this_week) > 0) {
cat(sprintf(" Week %02d/%d: ✓ KPIs found (%d files)\n", week_num, year_num, length(kpi_files_this_week)))
} else {
kpis_complete <- FALSE
cat(sprintf(" Week %02d/%d: ✗ KPIs not found\n", week_num, year_num))
}
}
}
if (kpis_complete) {
cat("✓ All KPIs available - full reporting window complete\n")
} else {
cat("⚠ Note: Some KPIs may still be missing - Script 80 calculated what was available\n")
}
# ==============================================================================
# SCRIPT 90: LEGACY WORD REPORT (agronomic_support clients)
# ==============================================================================
if (pipeline_success && run_legacy_report) {
cat("\n========== RUNNING SCRIPT 90: LEGACY WORD REPORT ==========\n")
tryCatch(
{
# Script 90 is an RMarkdown file - compile it with rmarkdown::render()
output_dir <- paths$reports_dir
# Reports directory already created by setup_project_directories
output_filename <- sprintf(
"CI_report_week%02d_%d.docx",
as.numeric(format(end_date, "%V")),
as.numeric(format(end_date, "%G"))
)
# Render the RMarkdown document
rmarkdown::render(
input = "r_app/90_CI_report_with_kpis_simple.Rmd",
output_dir = output_dir,
output_file = output_filename,
params = list(
report_date = format(end_date, "%Y-%m-%d"),
data_dir = project_dir
),
quiet = TRUE
)
cat(sprintf("✓ Script 90 completed - generated Word report: %s\n", output_filename))
},
error = function(e) {
cat("✗ Error in Script 90:", e$message, "\n")
pipeline_success <<- FALSE
}
)
} else if (run_legacy_report) {
cat("\n========== SKIPPING SCRIPT 90 (pipeline error) ==========\n")
}
# ==============================================================================
# SCRIPT 91: MODERN WORD REPORT (cane_supply clients)
# ==============================================================================
if (pipeline_success && run_modern_report) {
cat("\n========== RUNNING SCRIPT 91: MODERN WORD REPORT ==========\n")
tryCatch(
{
# Script 91 is an RMarkdown file - compile it with rmarkdown::render()
output_dir <- paths$reports_dir
# Reports directory already created by setup_project_directories
output_filename <- sprintf(
"CI_report_week%02d_%d.docx",
as.numeric(format(end_date, "%V")),
as.numeric(format(end_date, "%G"))
)
# Render the RMarkdown document
rmarkdown::render(
input = "r_app/91_CI_report_with_kpis_Angata.Rmd",
output_dir = output_dir,
output_file = output_filename,
params = list(
report_date = format(end_date, "%Y-%m-%d"),
data_dir = project_dir
),
quiet = TRUE
)
cat(sprintf("✓ Script 91 completed - generated Word report: %s\n", output_filename))
},
error = function(e) {
cat("✗ Error in Script 91:", e$message, "\n")
pipeline_success <<- FALSE
}
)
} else if (run_modern_report) {
cat("\n========== SKIPPING SCRIPT 91 (pipeline error) ==========\n")
}
# ==============================================================================
# SUMMARY
# ==============================================================================
cat("\n========== PIPELINE COMPLETE ==========\n")
cat(sprintf("Project: %s\n", project_dir))
cat(sprintf("End Date: %s\n", end_date_str))
cat(sprintf("Offset: %d days\n", offset))
if (pipeline_success) {
cat("Status: ✓ All scripts completed successfully\n")
} else {
cat("Status: ✗ Pipeline failed - check errors above\n")
}
cat("Pipeline sequence: Python Download → R 10 → R 20 → R 21 → R 30 → Python 31 → R 40 → R 80 → R 90/91\n")