SmartCane/r_app/run_full_pipeline.R

539 lines
24 KiB
R

# ==============================================================================
# FULL PIPELINE RUNNER
# ==============================================================================
# Mixed Python/R pipeline:
# 1. Python: Download Planet images
# 2. R 10: Create master grid and split TIFFs
# 3. R 20: CI Extraction
# 4. R 21: Convert CI RDS to CSV
# 5. R 30: Interpolate growth model
# 6. Python 31: Harvest imminent weekly
# 7. R 40: Mosaic creation
# 8. R 80: Calculate KPIs
#
# ==============================================================================
# HOW TO RUN THIS SCRIPT
# ==============================================================================
#
# Run from the smartcane/ directory:
#
# Option 1 (Recommended - shows real-time output):
# Rscript r_app/run_full_pipeline.R
#
# Option 2 (Full path to Rscript - use & in PowerShell for paths with spaces):
# & "C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe" r_app/run_full_pipeline.R
#
# Option 3 (Batch mode - output saved to .Rout file):
# R CMD BATCH --vanilla r_app/run_full_pipeline.R
#
# ==============================================================================
# ==============================================================================
# *** EDIT THESE VARIABLES ***
end_date <- as.Date("2025-12-31") # or specify: as.Date("2026-01-27") , Sys.Date()
offset <- 7 # days to look back
project_dir <- "aura" # project name: "esa", "aura", "angata", "chemba"
data_source <- if (project_dir == "angata") "merged_tif_8b" else "merged_tif"
force_rerun <- FALSE # Set to TRUE to force all scripts to run even if outputs exist
# ***************************
# Load client type mapping from parameters_project.R
source("r_app/parameters_project.R")
client_type <- get_client_type(project_dir)
cat(sprintf("\nProject: %s → Client Type: %s\n", project_dir, client_type))
# Format dates
end_date_str <- format(as.Date(end_date), "%Y-%m-%d")
# Track success of pipeline
pipeline_success <- TRUE
# Define conditional script execution based on client type
# Client types:
# - "cane_supply": Runs Scripts 20,21,22,23,30,31,80,91 (full pipeline with Excel output)
# - "agronomic_support": Runs Scripts 20,30,80,90 only (KPI calculation + Word report)
#
# Scripts that ALWAYS run (regardless of client type):
# - 00: Python Download
# - 10: Tiling (if outputs don't exist)
# - 20: CI Extraction
# - 30: Growth Model
# - 40: Mosaic Creation
# - 80: KPI Calculation
#
# Scripts that are client-type specific:
# - 21: CI RDS→CSV (cane_supply only)
# - 22: (cane_supply only)
# - 23: (cane_supply only)
# - 31: Harvest Imminent (cane_supply only)
# - 90: Legacy Word Report (agronomic_support only)
# - 91: Modern Excel Report (cane_supply only)
skip_cane_supply_only <- (client_type != "cane_supply") # Skip Scripts 21,22,23,31 for non-cane_supply
run_legacy_report <- (client_type == "agronomic_support") # Script 90 for agronomic support
run_modern_report <- (client_type == "cane_supply") # Script 91 for cane supply
# ==============================================================================
# INTELLIGENT CHECKING: What has already been completed?
# ==============================================================================
cat("\n========== CHECKING EXISTING OUTPUTS ==========\n")
# Detect mosaic mode (tile-based vs single-file) automatically
detect_mosaic_mode_simple <- function(project_dir) {
# Check for tile-based approach: weekly_tile_max/{grid_size}/week_*.tif
weekly_tile_max <- file.path("laravel_app", "storage", "app", project_dir, "weekly_tile_max")
if (dir.exists(weekly_tile_max)) {
subfolders <- list.dirs(weekly_tile_max, full.names = FALSE, recursive = FALSE)
grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE)
if (length(grid_patterns) > 0) {
return("tiled")
}
}
# Check for single-file approach: weekly_mosaic/week_*.tif
weekly_mosaic <- file.path("laravel_app", "storage", "app", project_dir, "weekly_mosaic")
if (dir.exists(weekly_mosaic)) {
files <- list.files(weekly_mosaic, pattern = "^week_.*\\.tif$")
if (length(files) > 0) {
return("single-file")
}
}
return("unknown")
}
mosaic_mode <- detect_mosaic_mode_simple(project_dir)
cat(sprintf("Auto-detected mosaic mode: %s\n", mosaic_mode))
# Check Script 10 outputs - FLEXIBLE: look for tiles either directly OR in grid subdirs
tiles_split_base <- file.path("laravel_app", "storage", "app", project_dir, "daily_tiles_split")
tiles_dates <- c()
if (dir.exists(tiles_split_base)) {
# Try grid-size subdirectories first (5x5, 10x10, etc.) - preferred new structure
subfolders <- list.dirs(tiles_split_base, full.names = FALSE, recursive = FALSE)
grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE)
if (length(grid_patterns) > 0) {
# New structure: daily_tiles_split/{grid_size}/{dates}/
grid_dir <- file.path(tiles_split_base, grid_patterns[1])
tiles_dates <- list.dirs(grid_dir, full.names = FALSE, recursive = FALSE)
} else {
# Old structure: daily_tiles_split/{dates}/ (no grid-size subfolder)
tiles_dates <- list.dirs(tiles_split_base, full.names = FALSE, recursive = FALSE)
}
}
cat(sprintf("Script 10: %d dates already tiled\n", length(tiles_dates)))
# Check Script 20 outputs (CI extraction) - daily RDS files
ci_daily_dir <- file.path("laravel_app", "storage", "app", project_dir, "Data", "extracted_ci", "daily_vals")
ci_files <- if (dir.exists(ci_daily_dir)) {
list.files(ci_daily_dir, pattern = "\\.rds$")
} else {
c()
}
cat(sprintf("Script 20: %d CI daily RDS files exist\n", length(ci_files)))
# Check Script 21 outputs (CSV conversion) - note: this gets overwritten each time, so we don't skip based on this
# Instead, check if CI RDS files exist - if they do, 21 should also run
# For now, just note that CSV is time-dependent, not a good skip indicator
cat("Script 21: CSV file exists but gets overwritten - will run if Script 20 runs\n")
# Check Script 40 outputs (mosaics) - check for THIS WEEK's mosaic specifically
# (important for Script 80, which needs the current week's mosaic)
current_week <- as.numeric(format(end_date, "%V"))
current_year <- as.numeric(format(end_date, "%Y"))
week_mosaic_pattern <- sprintf("week_%02d_%d\\.tif", current_week, current_year)
mosaic_files <- c()
if (mosaic_mode == "tiled") {
# For tile-based: look in weekly_tile_max/{grid_size}/ for this week's file
weekly_tile_max <- file.path("laravel_app", "storage", "app", project_dir, "weekly_tile_max")
subfolders <- list.dirs(weekly_tile_max, full.names = FALSE, recursive = FALSE)
grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE)
if (length(grid_patterns) > 0) {
mosaic_dir <- file.path(weekly_tile_max, grid_patterns[1])
mosaic_files <- list.files(mosaic_dir, pattern = week_mosaic_pattern)
}
} else if (mosaic_mode == "single-file") {
# For single-file: look in weekly_mosaic/ for this week's file
mosaic_dir <- file.path("laravel_app", "storage", "app", project_dir, "weekly_mosaic")
mosaic_files <- list.files(mosaic_dir, pattern = week_mosaic_pattern)
}
cat(sprintf("Script 40: %d mosaic files exist for week %02d\n", length(mosaic_files), current_week))
# Check Script 80 outputs (KPIs in reports/kpis/field_stats)
kpi_dir <- file.path("laravel_app", "storage", "app", project_dir, "reports", "kpis", "field_stats")
kpi_files <- if (dir.exists(kpi_dir)) {
list.files(kpi_dir, pattern = "\\.csv$|\\.json$")
} else {
c()
}
cat(sprintf("Script 80: %d KPI files exist\n", length(kpi_files)))
# Determine if scripts should run based on outputs AND client type
skip_10 <- (length(tiles_dates) > 0 && !force_rerun) # Always check tiles
skip_20 <- FALSE # Script 20 ALWAYS runs for all client types - processes new downloaded data
skip_21 <- skip_cane_supply_only # Script 21 runs ONLY for cane_supply clients (CI→CSV conversion)
skip_22 <- skip_cane_supply_only # Script 22 runs ONLY for cane_supply clients
skip_23 <- skip_cane_supply_only # Script 23 runs ONLY for cane_supply clients
skip_30 <- FALSE # Script 30 ALWAYS runs for all client types
skip_31 <- skip_cane_supply_only # Script 31 runs ONLY for cane_supply clients
skip_40 <- (length(mosaic_files) > 0 && !force_rerun) # Always check mosaics
skip_80 <- FALSE # Script 80 ALWAYS runs for all client types - calculates KPIs for current week
cat("\nSkipping decisions (based on outputs AND client type):\n")
cat(sprintf(" Script 10: %s\n", if(skip_10) "SKIP" else "RUN"))
cat(sprintf(" Script 20: RUN (always runs to process new downloads)\n"))
cat(sprintf(" Script 21: %s %s\n", if(skip_21) "SKIP" else "RUN", if(skip_cane_supply_only && !skip_21) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 22: %s %s\n", if(skip_22) "SKIP" else "RUN", if(skip_cane_supply_only) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 23: %s %s\n", if(skip_23) "SKIP" else "RUN", if(skip_cane_supply_only) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 30: %s (always runs)\n", if(skip_30) "SKIP" else "RUN"))
cat(sprintf(" Script 31: %s %s\n", if(skip_31) "SKIP" else "RUN", if(skip_cane_supply_only) "(non-cane_supply client)" else ""))
cat(sprintf(" Script 40: %s %s\n", if(skip_40) "SKIP" else "RUN", if(!skip_40) "" else "(mosaics exist)"))
cat(sprintf(" Script 80: %s (always runs)\n", if(skip_80) "SKIP" else "RUN"))
cat(sprintf(" Script 90: %s %s\n", if(!run_legacy_report) "SKIP" else "RUN", if(run_legacy_report) "(agronomic_support legacy report)" else ""))
cat(sprintf(" Script 91: %s %s\n", if(!run_modern_report) "SKIP" else "RUN", if(run_modern_report) "(cane_supply modern report)" else ""))
# ==============================================================================
# PYTHON: DOWNLOAD PLANET IMAGES (MISSING DATES ONLY)
# ==============================================================================
cat("\n========== DOWNLOADING PLANET IMAGES (MISSING DATES ONLY) ==========\n")
tryCatch({
# Setup paths
base_path <- file.path("laravel_app", "storage", "app", project_dir)
merged_tifs_dir <- file.path(base_path, data_source)
# Get existing dates from raw TIFFs
existing_tiff_files <- list.files(merged_tifs_dir, pattern = "^\\d{4}-\\d{2}-\\d{2}\\.tif$")
existing_tiff_dates <- sub("\\.tif$", "", existing_tiff_files)
# Get existing dates from tiles (better indicator of completion for tiled projects)
existing_tile_dates <- tiles_dates
# For single-file projects, use raw TIFF files as the indicator instead
# This prevents re-downloading data that already exists
if (mosaic_mode == "single-file" && length(existing_tiff_dates) > 0) {
existing_tile_dates <- existing_tiff_dates
}
# Find missing dates in the window
start_date <- end_date - offset
date_seq <- seq(start_date, end_date, by = "day")
target_dates <- format(date_seq, "%Y-%m-%d")
# Only download if files don't exist yet (tiles for tiled projects, TIFFs for single-file)
missing_dates <- target_dates[!(target_dates %in% existing_tile_dates)]
if (mosaic_mode == "single-file") {
cat(sprintf(" Existing TIFF dates: %d\n", length(existing_tile_dates)))
} else {
cat(sprintf(" Existing tiled dates: %d\n", length(existing_tile_dates)))
}
cat(sprintf(" Missing dates in window: %d\n", length(missing_dates)))
# Download each missing date
download_count <- 0
download_failed <- 0
if (length(missing_dates) > 0) {
# Save current directory
original_dir <- getwd()
# Change to python_app directory so relative paths work correctly
setwd("python_app")
for (date_str in missing_dates) {
cmd <- sprintf('python 00_download_8band_pu_optimized.py "%s" --date "%s" --resolution 3 --cleanup', project_dir, date_str)
result <- system(cmd, ignore.stdout = FALSE, ignore.stderr = FALSE)
if (result == 0) {
download_count <- download_count + 1
} else {
download_failed <- download_failed + 1
}
}
# Change back to original directory
setwd(original_dir)
}
cat(sprintf("✓ Downloaded %d dates, %d failed\n", download_count, download_failed))
if (download_failed > 0) {
cat("⚠ Some downloads failed, but continuing pipeline\n")
}
# Force Script 10 to run ONLY if downloads actually succeeded (not just attempted)
if (download_count > 0) {
skip_10 <- FALSE
}
}, error = function(e) {
cat("✗ Error in planet download:", e$message, "\n")
pipeline_success <<- FALSE
})
# ==============================================================================
# SCRIPT 10: CREATE MASTER GRID AND SPLIT TIFFs
# ==============================================================================
if (pipeline_success && !skip_10) {
cat("\n========== RUNNING SCRIPT 10: CREATE MASTER GRID AND SPLIT TIFFs ==========\n")
tryCatch({
# CRITICAL: Save global variables before sourcing Script 10 (it overwrites end_date, offset, etc.)
saved_end_date <- end_date
saved_offset <- offset
saved_project_dir <- project_dir
saved_data_source <- data_source
# Set environment variables for the script (Script 10 uses these for filtering)
assign("PROJECT", project_dir, envir = .GlobalEnv)
# Suppress verbose per-date output, show only summary
sink(nullfile())
source("r_app/10_create_master_grid_and_split_tiffs.R")
sink()
# CRITICAL: Restore global variables after sourcing Script 10
end_date <- saved_end_date
offset <- saved_offset
project_dir <- saved_project_dir
data_source <- saved_data_source
# Verify output
tiles_dir <- file.path("laravel_app", "storage", "app", project_dir, "daily_tiles_split", "5x5")
if (dir.exists(tiles_dir)) {
subdirs <- list.dirs(tiles_dir, full.names = FALSE, recursive = FALSE)
cat(sprintf("✓ Script 10 completed - created tiles for %d dates\n", length(subdirs)))
} else {
cat("✓ Script 10 completed\n")
}
}, error = function(e) {
sink()
cat("✗ Error in Script 10:", e$message, "\n")
pipeline_success <<- FALSE
})
} else if (skip_10) {
cat("\n========== SKIPPING SCRIPT 10 (tiles already exist) ==========\n")
}
# ==============================================================================
# SCRIPT 20: CI EXTRACTION
# ==============================================================================
if (pipeline_success && !skip_20) {
cat("\n========== RUNNING SCRIPT 20: CI EXTRACTION ==========\n")
tryCatch({
# Run Script 20 via system() to pass command-line args just like from terminal
# Arguments: end_date offset project_dir data_source
cmd <- sprintf('"C:\\Program Files\\R\\R-4.4.3\\bin\\x64\\Rscript.exe" --vanilla r_app/20_ci_extraction.R "%s" %d "%s" "%s"',
format(end_date, "%Y-%m-%d"), offset, project_dir, data_source)
result <- system(cmd)
if (result != 0) {
stop("Script 20 exited with error code:", result)
}
# Verify CI output was created
ci_daily_dir <- file.path("laravel_app", "storage", "app", project_dir, "Data", "extracted_ci", "daily_vals")
if (dir.exists(ci_daily_dir)) {
files <- list.files(ci_daily_dir, pattern = "\\.rds$")
cat(sprintf("✓ Script 20 completed - generated %d CI files\n", length(files)))
} else {
cat("✓ Script 20 completed\n")
}
}, error = function(e) {
cat("✗ Error in Script 20:", e$message, "\n")
pipeline_success <<- FALSE
})
} else if (skip_20) {
cat("\n========== SKIPPING SCRIPT 20 (CI already extracted) ==========\n")
}
# ==============================================================================
# SCRIPT 21: CONVERT CI RDS TO CSV
# ==============================================================================
if (pipeline_success && !skip_21) {
cat("\n========== RUNNING SCRIPT 21: CONVERT CI RDS TO CSV ==========\n")
tryCatch({
# Set environment variables for the script
assign("end_date", end_date, envir = .GlobalEnv)
assign("offset", offset, envir = .GlobalEnv)
assign("project_dir", project_dir, envir = .GlobalEnv)
source("r_app/21_convert_ci_rds_to_csv.R")
main() # Call main() to execute the script with the environment variables
# Verify CSV output was created
ci_csv_path <- file.path("laravel_app", "storage", "app", project_dir, "ci_extracted")
if (dir.exists(ci_csv_path)) {
csv_files <- list.files(ci_csv_path, pattern = "\\.csv$")
cat(sprintf("✓ Script 21 completed - converted to %d CSV files\n", length(csv_files)))
} else {
cat("✓ Script 21 completed\n")
}
}, error = function(e) {
cat("✗ Error in Script 21:", e$message, "\n")
pipeline_success <<- FALSE
})
} else if (skip_21) {
cat("\n========== SKIPPING SCRIPT 21 (CSV already created) ==========\n")
}
# ==============================================================================
# SCRIPT 30: INTERPOLATE GROWTH MODEL
# ==============================================================================
if (pipeline_success && !skip_30) {
cat("\n========== RUNNING SCRIPT 30: INTERPOLATE GROWTH MODEL ==========\n")
tryCatch({
# Run Script 30 via system() to pass command-line args just like from terminal
# Script 30 expects: project_dir as first argument only
cmd <- sprintf('"C:\\Program Files\\R\\R-4.4.3\\bin\\x64\\Rscript.exe" --vanilla r_app/30_interpolate_growth_model.R "%s"',
project_dir)
result <- system(cmd)
if (result != 0) {
stop("Script 30 exited with error code:", result)
}
# Verify interpolated output
growth_dir <- file.path("laravel_app", "storage", "app", project_dir, "growth_model_interpolated")
if (dir.exists(growth_dir)) {
files <- list.files(growth_dir, pattern = "\\.rds$|\\.csv$")
cat(sprintf("✓ Script 30 completed - generated %d growth model files\n", length(files)))
} else {
cat("✓ Script 30 completed\n")
}
}, error = function(e) {
cat("✗ Error in Script 30:", e$message, "\n")
pipeline_success <<- FALSE
})
}
# ==============================================================================
# PYTHON 31: HARVEST IMMINENT WEEKLY
# ==============================================================================
if (pipeline_success && !skip_31) {
cat("\n========== RUNNING PYTHON 31: HARVEST IMMINENT WEEKLY ==========\n")
tryCatch({
# Run Python script in pytorch_gpu conda environment
# Script expects positional project name (not --project flag)
# Run from smartcane root so conda can find the environment
cmd <- sprintf('conda run -n pytorch_gpu python python_app/31_harvest_imminent_weekly.py %s', project_dir)
cat("DEBUG: Running command:", cmd, "\n")
result <- system(cmd)
if (result == 0) {
# Verify harvest output - check for THIS WEEK's specific file
current_week <- as.numeric(format(end_date, "%V"))
current_year <- as.numeric(format(end_date, "%Y"))
expected_file <- file.path("laravel_app", "storage", "app", project_dir, "reports", "kpis", "field_stats",
sprintf("%s_harvest_imminent_week_%02d_%d.csv", project_dir, current_week, current_year))
if (file.exists(expected_file)) {
cat(sprintf("✓ Script 31 completed - generated harvest imminent file for week %02d\n", current_week))
} else {
cat("✓ Script 31 completed (check if harvest.xlsx is available)\n")
}
} else {
cat("⚠ Script 31 completed with errors (check harvest.xlsx availability)\n")
}
}, error = function(e) {
setwd(original_dir)
cat("⚠ Script 31 error:", e$message, "\n")
})
} else if (skip_31) {
cat("\n========== SKIPPING SCRIPT 31 (non-cane_supply client type) ==========\n")
}
# ==============================================================================
# SCRIPT 40: MOSAIC CREATION
# ==============================================================================
if (pipeline_success && !skip_40) {
cat("\n========== RUNNING SCRIPT 40: MOSAIC CREATION ==========\n")
tryCatch({
# Run Script 40 via system() to pass command-line args just like from terminal
# Use full path and --vanilla to avoid renv/environment issues
# Arguments: end_date offset project_dir (file_name_tif is auto-generated from dates)
cmd <- sprintf('"C:\\Program Files\\R\\R-4.4.3\\bin\\x64\\Rscript.exe" --vanilla r_app/40_mosaic_creation.R "%s" %d "%s"',
format(end_date, "%Y-%m-%d"), offset, project_dir)
result <- system(cmd)
if (result != 0) {
stop("Script 40 exited with error code:", result)
}
# Verify mosaic output - check based on mosaic mode (tiled vs single-file)
mosaic_files_check <- c()
if (mosaic_mode == "tiled") {
mosaic_dir <- file.path("laravel_app", "storage", "app", project_dir, "weekly_tile_max", "5x5")
if (dir.exists(mosaic_dir)) {
# Check for current week's file only
current_week_check <- as.numeric(format(end_date, "%V"))
current_year_check <- as.numeric(format(end_date, "%Y"))
week_pattern_check <- sprintf("week_%02d_%d\\.tif", current_week_check, current_year_check)
mosaic_files_check <- list.files(mosaic_dir, pattern = week_pattern_check)
}
} else {
mosaic_dir <- file.path("laravel_app", "storage", "app", project_dir, "weekly_mosaic")
if (dir.exists(mosaic_dir)) {
# Check for current week's file only
current_week_check <- as.numeric(format(end_date, "%V"))
current_year_check <- as.numeric(format(end_date, "%Y"))
week_pattern_check <- sprintf("week_%02d_%d\\.tif", current_week_check, current_year_check)
mosaic_files_check <- list.files(mosaic_dir, pattern = week_pattern_check)
}
}
if (length(mosaic_files_check) > 0) {
cat(sprintf("✓ Script 40 completed - created mosaic for week %02d\n", current_week))
} else {
cat("✓ Script 40 completed\n")
}
}, error = function(e) {
cat("✗ Error in Script 40:", e$message, "\n")
pipeline_success <<- FALSE
})
} else if (skip_40) {
cat("\n========== SKIPPING SCRIPT 40 (mosaics already created) ==========\n")
}
# ==============================================================================
# SCRIPT 80: CALCULATE KPIs
# ==============================================================================
if (pipeline_success) { # Always run Script 80 - it calculates KPIs for the current week
cat("\n========== RUNNING SCRIPT 80: CALCULATE KPIs ==========\n")
tryCatch({
# Run Script 80 via system() to pass command-line args just like from terminal
# Use full path and --vanilla to avoid renv/environment issues
cmd <- sprintf('"C:\\Program Files\\R\\R-4.4.3\\bin\\x64\\Rscript.exe" --vanilla r_app/80_calculate_kpis.R "%s" %d "%s" "%s"',
format(end_date, "%Y-%m-%d"), offset, project_dir, data_source)
result <- system(cmd)
if (result != 0) {
stop("Script 80 exited with error code:", result)
}
# Verify KPI output
kpi_dir <- file.path("laravel_app", "storage", "app", project_dir, "reports", "kpis", "field_stats")
if (dir.exists(kpi_dir)) {
files <- list.files(kpi_dir, pattern = "\\.csv$|\\.json$")
cat(sprintf("✓ Script 80 completed - generated %d KPI files\n", length(files)))
} else {
cat("✓ Script 80 completed\n")
}
}, error = function(e) {
cat("✗ Error in Script 80:", e$message, "\n")
pipeline_success <<- FALSE
})
}
# ==============================================================================
# SUMMARY
# ==============================================================================
cat("\n========== PIPELINE COMPLETE ==========\n")
cat(sprintf("Project: %s\n", project_dir))
cat(sprintf("End Date: %s\n", end_date_str))
cat(sprintf("Offset: %d days\n", offset))
if (pipeline_success) {
cat("Status: ✓ All scripts completed successfully\n")
} else {
cat("Status: ✗ Pipeline failed - check errors above\n")
}
cat("Pipeline sequence: Python Download → R 10 → R 20 → R 21 → R 30 → Python 31 → R 40 → R 80\n")