865 lines
36 KiB
R
865 lines
36 KiB
R
# ==============================================================================
|
|
# FULL PIPELINE RUNNER
|
|
# ==============================================================================
|
|
# Mixed Python/R pipeline:
|
|
# 1. Python: Download Planet images
|
|
# 2. R 10: Create master grid and split TIFFs
|
|
# 3. R 20: CI Extraction
|
|
# 4. R 21: Convert CI RDS to CSV
|
|
# 5. R 30: Interpolate growth model
|
|
# 6. Python 31: Harvest imminent weekly
|
|
# 7. R 40: Mosaic creation
|
|
# 8. R 80: Calculate KPIs
|
|
#
|
|
# ==============================================================================
|
|
# HOW TO RUN THIS SCRIPT
|
|
# ==============================================================================
|
|
#
|
|
# Run from the smartcane/ directory:
|
|
#
|
|
# Option 1 (Recommended - shows real-time output):
|
|
# Rscript r_app/run_full_pipeline.R
|
|
#
|
|
# Option 2 (Full path to Rscript - use & in PowerShell for paths with spaces):
|
|
# & "C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe" r_app/run_full_pipeline.R
|
|
#
|
|
# Option 3 (Batch mode - output saved to .Rout file):
|
|
# R CMD BATCH --vanilla r_app/run_full_pipeline.R
|
|
#
|
|
# ==============================================================================
|
|
# ==============================================================================
|
|
|
|
# *** EDIT THESE VARIABLES ***
|
|
end_date <- Sys.Date() # or specify: as.Date("2026-01-27") , Sys.Date()
|
|
project_dir <- "angata" # project name: "esa", "aura", "angata", "chemba"
|
|
data_source <- "merged_tif" # Standard data source directory
|
|
force_rerun <- FALSE # Set to TRUE to force all scripts to run even if outputs exist
|
|
# ***************************
|
|
|
|
# Define Rscript path for running external R scripts via system()
|
|
RSCRIPT_PATH <- file.path("C:", "Program Files", "R", "R-4.4.3", "bin", "x64", "Rscript.exe")
|
|
|
|
# Load client type mapping and centralized paths from parameters_project.R
|
|
source("r_app/parameters_project.R")
|
|
source("r_app/00_common_utils.R")
|
|
paths <- setup_project_directories(project_dir)
|
|
client_type <- get_client_type(project_dir)
|
|
cat(sprintf("\nProject: %s → Client Type: %s\n", project_dir, client_type))
|
|
|
|
# ==============================================================================
|
|
# DETERMINE REPORTING WINDOW (auto-calculated based on KPI requirements)
|
|
# ==============================================================================
|
|
# Script 80 (KPIs) needs N weeks of historical data for trend analysis and reporting
|
|
# We calculate this automatically based on client type
|
|
reporting_weeks_needed <- 1 # Default: KPIs need current week of data for trends
|
|
offset <- reporting_weeks_needed * 7 # Convert weeks to days (minimum 7 days for 1 week)
|
|
|
|
cat(sprintf("\n[INFO] Reporting window: %d weeks (%d days of data)\n", reporting_weeks_needed, offset))
|
|
wwy_current <- get_iso_week_year(end_date)
|
|
cat(sprintf(" Running week: %02d / %d\n", wwy_current$week, wwy_current$year))
|
|
cat(sprintf(" Date range: %s to %s\n", format(end_date - offset, "%Y-%m-%d"), format(end_date, "%Y-%m-%d")))
|
|
|
|
|
|
# Format dates
|
|
end_date_str <- format(as.Date(end_date), "%Y-%m-%d")
|
|
|
|
# Track success of pipeline
|
|
pipeline_success <- TRUE
|
|
|
|
# ==============================================================================
|
|
# EARLY PREREQ CHECK: Verify mosaic requirements BEFORE any downloads
|
|
# ==============================================================================
|
|
# This determines if we need more weeks of data than the initial reporting window
|
|
# Run this BEFORE downloads so we can download ONLY missing dates upfront
|
|
cat("\n========== EARLY CHECK: MOSAIC REQUIREMENTS FOR REPORTING WINDOW ==========\n")
|
|
|
|
# Detect mosaic mode early (centralized function in parameters_project.R)
|
|
mosaic_mode <- detect_mosaic_mode(project_dir)
|
|
|
|
# Check what mosaics we NEED
|
|
weeks_needed <- data.frame()
|
|
for (weeks_back in 0:(reporting_weeks_needed - 1)) {
|
|
check_date <- end_date - (weeks_back * 7)
|
|
wwy <- get_iso_week_year(check_date)
|
|
weeks_needed <- rbind(weeks_needed, data.frame(week = wwy$week, year = wwy$year, date = check_date))
|
|
}
|
|
|
|
missing_weeks_dates <- c() # Will store the earliest date of missing weeks
|
|
earliest_missing_date <- end_date # Start with end_date, go back if needed
|
|
missing_weeks <- data.frame() # Track ALL missing weeks for later processing by Script 40
|
|
|
|
for (i in 1:nrow(weeks_needed)) {
|
|
week_num <- weeks_needed[i, "week"]
|
|
year_num <- weeks_needed[i, "year"]
|
|
check_date <- weeks_needed[i, "date"]
|
|
|
|
# Pattern must be flexible to match both:
|
|
# - Single-file: week_51_2025.tif (top-level)
|
|
# - Single-file per-field: week_51_2025.tif (in {FIELD}/ subdirectories)
|
|
# - Tiled: week_51_2025_01.tif, week_51_2025_02.tif, etc.
|
|
week_pattern_check <- sprintf("week_%02d_%d", week_num, year_num)
|
|
files_this_week <- c()
|
|
|
|
if (mosaic_mode == "tiled") {
|
|
mosaic_dir_check <- get_mosaic_dir(project_dir, mosaic_mode = "tiled")
|
|
if (dir.exists(mosaic_dir_check)) {
|
|
# NEW: Support per-field architecture - search recursively for mosaics in field subdirectories
|
|
files_this_week <- list.files(mosaic_dir_check, pattern = week_pattern_check, recursive = TRUE, full.names = FALSE)
|
|
}
|
|
} else if (mosaic_mode == "single-file") {
|
|
mosaic_dir_check <- paths$weekly_mosaic_dir
|
|
if (dir.exists(mosaic_dir_check)) {
|
|
# NEW: Support per-field architecture - search recursively for mosaics in field subdirectories
|
|
# Check both top-level (legacy) and field subdirectories (per-field architecture)
|
|
files_this_week <- list.files(mosaic_dir_check, pattern = week_pattern_check, recursive = TRUE, full.names = FALSE)
|
|
}
|
|
}
|
|
|
|
cat(sprintf(
|
|
" Week %02d/%d (%s): %s\n", week_num, year_num, format(check_date, "%Y-%m-%d"),
|
|
if (length(files_this_week) > 0) "✓ EXISTS" else "✗ MISSING"
|
|
))
|
|
|
|
# If week is missing, track its date range for downloading/processing
|
|
if (length(files_this_week) == 0) {
|
|
week_start <- check_date - 6 # Monday of that week
|
|
if (week_start < earliest_missing_date) {
|
|
earliest_missing_date <- week_start
|
|
}
|
|
# Add to missing_weeks dataframe - Script 40 will process these
|
|
missing_weeks <- rbind(missing_weeks, data.frame(week = week_num, year = year_num, week_end_date = check_date))
|
|
}
|
|
}
|
|
|
|
# Calculate dynamic offset for preprocessing: only process from earliest missing week to end_date
|
|
if (earliest_missing_date < end_date) {
|
|
cat(sprintf("\n[INFO] Missing week(s) detected - need to fill from %s onwards\n", format(earliest_missing_date, "%Y-%m-%d")))
|
|
|
|
# Adjust offset to cover only the gap (from earliest missing week to end_date)
|
|
dynamic_offset <- as.numeric(end_date - earliest_missing_date)
|
|
cat(sprintf(
|
|
"[INFO] Will download/process ONLY missing dates: %d days (from %s to %s)\n",
|
|
dynamic_offset, format(earliest_missing_date, "%Y-%m-%d"), format(end_date, "%Y-%m-%d")
|
|
))
|
|
|
|
# Use dynamic offset for data generation scripts (10, 20, 30, 40)
|
|
# But Script 80 still uses full reporting_weeks_needed offset for KPI calculations
|
|
data_generation_offset <- dynamic_offset
|
|
force_data_generation <- TRUE
|
|
} else {
|
|
cat("\n[INFO] ✓ All required mosaics exist - using normal reporting window\n")
|
|
data_generation_offset <- offset # Use default reporting window offset
|
|
force_data_generation <- FALSE
|
|
}
|
|
|
|
# ==============================================================================
|
|
# CHECK KPI REQUIREMENTS FOR REPORTING WINDOW
|
|
# ==============================================================================
|
|
# Scripts 90 (Word report) and 91 (Excel report) require KPIs for full reporting window
|
|
# Script 80 ALWAYS runs and will CALCULATE missing KPIs, so this is just for visibility
|
|
# Uses centralized check_kpi_completeness() function from parameters_project.R
|
|
cat("\n========== KPI REQUIREMENT CHECK ==========\n")
|
|
cat(sprintf(
|
|
"KPIs needed for reporting: %d weeks (current week + %d weeks history)\n",
|
|
reporting_weeks_needed, reporting_weeks_needed - 1
|
|
))
|
|
|
|
# Check KPI completeness (replaces duplicate logic from lines ~228-270 and ~786-810)
|
|
kpi_check <- check_kpi_completeness(project_dir, client_type, end_date, reporting_weeks_needed)
|
|
kpi_dir <- kpi_check$kpi_dir
|
|
kpis_needed <- kpi_check$kpis_df
|
|
kpis_missing_count <- kpi_check$missing_count
|
|
|
|
# Create KPI directory if it doesn't exist
|
|
if (!dir.exists(kpi_dir)) {
|
|
dir.create(kpi_dir, recursive = TRUE, showWarnings = FALSE)
|
|
}
|
|
|
|
# Display status for each week
|
|
for (i in 1:nrow(kpis_needed)) {
|
|
row <- kpis_needed[i, ]
|
|
cat(sprintf(
|
|
" Week %02d/%d (%s): %s (%d files)\n",
|
|
row$week, row$year, format(row$date, "%Y-%m-%d"),
|
|
if (row$has_kpis) "✓ EXISTS" else "✗ WILL BE CALCULATED",
|
|
row$file_count
|
|
))
|
|
}
|
|
|
|
cat(sprintf(
|
|
"\nKPI Summary: %d/%d weeks exist, %d week(s) will be calculated by Script 80\n",
|
|
nrow(kpis_needed) - kpis_missing_count, nrow(kpis_needed), kpis_missing_count
|
|
))
|
|
|
|
# Define conditional script execution based on client type
|
|
# Client types:
|
|
# - "cane_supply": Runs Scripts 20,21,22,23,30,31,80,91 (full pipeline with Excel output)
|
|
# - "agronomic_support": Runs Scripts 20,30,80,90 only (KPI calculation + Word report)
|
|
#
|
|
# Scripts that ALWAYS run (regardless of client type):
|
|
# - 00: Python Download
|
|
# - 10: Tiling (if outputs don't exist)
|
|
# - 20: CI Extraction
|
|
# - 30: Growth Model
|
|
# - 40: Mosaic Creation
|
|
# - 80: KPI Calculation
|
|
#
|
|
# Scripts that are client-type specific:
|
|
# - 21: CI RDS→CSV (cane_supply only)
|
|
# - 22: (cane_supply only)
|
|
# - 23: (cane_supply only)
|
|
# - 31: Harvest Imminent (cane_supply only)
|
|
# - 90: Legacy Word Report (agronomic_support only)
|
|
# - 91: Modern Excel Report (cane_supply only)
|
|
skip_cane_supply_only <- (client_type != "cane_supply") # Skip Scripts 21,22,23,31 for non-cane_supply
|
|
run_legacy_report <- (client_type == "agronomic_support") # Script 90 for agronomic support
|
|
run_modern_report <- (client_type == "cane_supply") # Script 91 for cane supply
|
|
|
|
# ==============================================================================
|
|
# INTELLIGENT CHECKING: What has already been completed?
|
|
# ==============================================================================
|
|
cat("\n========== CHECKING EXISTING OUTPUTS ==========\n")
|
|
|
|
# Use centralized mosaic mode detection from parameters_project.R
|
|
cat(sprintf("Auto-detected mosaic mode: %s\n", mosaic_mode))
|
|
|
|
# Check Script 10 outputs - FLEXIBLE: look for tiles either directly OR in grid subdirs
|
|
tiles_split_base <- paths$daily_tiles_split_dir
|
|
tiles_dates <- c()
|
|
if (dir.exists(tiles_split_base)) {
|
|
# Try grid-size subdirectories first (5x5, 10x10, etc.) - preferred new structure
|
|
subfolders <- list.dirs(tiles_split_base, full.names = FALSE, recursive = FALSE)
|
|
grid_patterns <- grep("^\\d+x\\d+$", subfolders, value = TRUE)
|
|
|
|
if (length(grid_patterns) > 0) {
|
|
# New structure: daily_tiles_split/{grid_size}/{dates}/
|
|
grid_dir <- file.path(tiles_split_base, grid_patterns[1])
|
|
tiles_dates <- list.dirs(grid_dir, full.names = FALSE, recursive = FALSE)
|
|
} else {
|
|
# Old structure: daily_tiles_split/{dates}/ (no grid-size subfolder)
|
|
tiles_dates <- list.dirs(tiles_split_base, full.names = FALSE, recursive = FALSE)
|
|
}
|
|
}
|
|
cat(sprintf("Script 10: %d dates already tiled\n", length(tiles_dates)))
|
|
|
|
# Check Script 20 outputs (CI extraction) - daily RDS files
|
|
ci_daily_dir <- paths$daily_ci_vals_dir
|
|
ci_files <- if (dir.exists(ci_daily_dir)) {
|
|
list.files(ci_daily_dir, pattern = "\\.rds$")
|
|
} else {
|
|
c()
|
|
}
|
|
cat(sprintf("Script 20: %d CI daily RDS files exist\n", length(ci_files)))
|
|
|
|
# Check Script 21 outputs (CSV conversion) - note: this gets overwritten each time, so we don't skip based on this
|
|
# Instead, check if CI RDS files exist - if they do, 21 should also run
|
|
# For now, just note that CSV is time-dependent, not a good skip indicator
|
|
cat("Script 21: CSV file exists but gets overwritten - will run if Script 20 runs\n")
|
|
|
|
# Check Script 40 outputs (mosaics) - check which weeks are missing (not just current week)
|
|
# The early check section already identified missing_weeks, so we use that
|
|
skip_40 <- (nrow(missing_weeks) == 0 && !force_rerun) # Only skip if NO missing weeks AND not forcing rerun
|
|
cat(sprintf("Script 40: %d missing week(s) to create\n", nrow(missing_weeks)))
|
|
|
|
# Check Script 80 outputs (KPIs in reports/kpis/{field_level|field_analysis})
|
|
# kpi_dir already set by check_kpi_completeness() above
|
|
kpi_files <- if (dir.exists(kpi_dir)) {
|
|
list.files(kpi_dir, pattern = "\\.csv$|\\.json$")
|
|
} else {
|
|
c()
|
|
}
|
|
cat(sprintf("Script 80: %d KPI files exist\n", length(kpi_files)))
|
|
|
|
# Determine if scripts should run based on outputs AND client type
|
|
skip_10 <- (length(tiles_dates) > 0 && !force_rerun && !force_data_generation) # Force Script 10 if missing weeks detected
|
|
skip_20 <- FALSE # Script 20 ALWAYS runs for all client types - processes new downloaded data
|
|
skip_21 <- skip_cane_supply_only # Script 21 runs ONLY for cane_supply clients (CI→CSV conversion)
|
|
skip_22 <- skip_cane_supply_only # Script 22 runs ONLY for cane_supply clients
|
|
skip_23 <- skip_cane_supply_only # Script 23 runs ONLY for cane_supply clients
|
|
skip_30 <- FALSE # Script 30 ALWAYS runs for all client types
|
|
skip_31 <- skip_cane_supply_only # Script 31 runs ONLY for cane_supply clients
|
|
skip_40 <- (nrow(missing_weeks) == 0 && !force_rerun) # Skip Script 40 only if NO missing weeks
|
|
skip_80 <- (kpis_missing_count == 0 && !force_rerun) # Skip Script 80 only if ALL KPIs exist AND not forcing rerun
|
|
|
|
cat("\nSkipping decisions (based on outputs AND client type):\n")
|
|
cat(sprintf(" Script 10: %s\n", if (skip_10) "SKIP" else "RUN"))
|
|
cat(sprintf(" Script 20: RUN (always runs to process new downloads)\n"))
|
|
cat(sprintf(" Script 21: %s %s\n", if (skip_21) "SKIP" else "RUN", if (skip_cane_supply_only && !skip_21) "(non-cane_supply client)" else ""))
|
|
cat(sprintf(" Script 22: %s %s\n", if (skip_22) "SKIP" else "RUN", if (skip_cane_supply_only) "(non-cane_supply client)" else ""))
|
|
cat(sprintf(" Script 23: %s %s\n", if (skip_23) "SKIP" else "RUN", if (skip_cane_supply_only) "(non-cane_supply client)" else ""))
|
|
cat(sprintf(" Script 30: %s (always runs)\n", if (skip_30) "SKIP" else "RUN"))
|
|
cat(sprintf(" Script 31: %s %s\n", if (skip_31) "SKIP" else "RUN", if (skip_cane_supply_only) "(non-cane_supply client)" else ""))
|
|
cat(sprintf(" Script 40: %s (looping through %d missing weeks)\n", if (skip_40) "SKIP" else "RUN", nrow(missing_weeks)))
|
|
cat(sprintf(" Script 80: %s (always runs)\n", if (skip_80) "SKIP" else "RUN"))
|
|
cat(sprintf(" Script 90: %s %s\n", if (!run_legacy_report) "SKIP" else "RUN", if (run_legacy_report) "(agronomic_support legacy report)" else ""))
|
|
cat(sprintf(" Script 91: %s %s\n", if (!run_modern_report) "SKIP" else "RUN", if (run_modern_report) "(cane_supply modern report)" else ""))
|
|
|
|
# ==============================================================================
|
|
# PYTHON: DOWNLOAD PLANET IMAGES (MISSING DATES ONLY)
|
|
# ==============================================================================
|
|
cat("\n========== DOWNLOADING PLANET IMAGES (MISSING DATES ONLY) ==========\n")
|
|
tryCatch(
|
|
{
|
|
# Setup paths
|
|
# NOTE: All downloads go to merged_tif/ regardless of project
|
|
# (data_source variable is used later by Script 20 for reading, but downloads always go to merged_tif)
|
|
merged_tifs_dir <- paths$merged_tif_folder # Always check merged_tif for downloads
|
|
|
|
cat(sprintf("[DEBUG] Checking for existing files in: %s\n", merged_tifs_dir))
|
|
cat(sprintf("[DEBUG] Directory exists: %s\n", dir.exists(merged_tifs_dir)))
|
|
|
|
# Get existing dates from raw TIFFs in merged_tif/
|
|
existing_tiff_files <- list.files(merged_tifs_dir, pattern = "^\\d{4}-\\d{2}-\\d{2}\\.tif$")
|
|
existing_tiff_dates <- sub("\\.tif$", "", existing_tiff_files)
|
|
|
|
cat(sprintf("[DEBUG] Found %d existing TIFF files\n", length(existing_tiff_files)))
|
|
if (length(existing_tiff_files) > 0) {
|
|
cat(sprintf("[DEBUG] Sample files: %s\n", paste(head(existing_tiff_files, 3), collapse=", ")))
|
|
}
|
|
|
|
# Get existing dates from tiles (better indicator of completion for tiled projects)
|
|
existing_tile_dates <- tiles_dates
|
|
|
|
# CRITICAL FIX: Always use TIFF dates for checking existing files
|
|
# This is the source of truth - if merged_tif/ has a file, don't re-download it
|
|
# We don't download again if the file exists, regardless of whether tiles have been created yet
|
|
if (length(existing_tiff_dates) > 0) {
|
|
cat(sprintf("[DEBUG] Using TIFF dates for existence check (found %d existing files)\n", length(existing_tiff_dates)))
|
|
existing_tile_dates <- existing_tiff_dates
|
|
}
|
|
|
|
# Find missing dates in the window
|
|
start_date <- end_date - data_generation_offset
|
|
date_seq <- seq(start_date, end_date, by = "day")
|
|
target_dates <- format(date_seq, "%Y-%m-%d")
|
|
|
|
# Only download if files don't exist yet (tiles for tiled projects, TIFFs for single-file)
|
|
missing_dates <- target_dates[!(target_dates %in% existing_tile_dates)]
|
|
|
|
if (mosaic_mode == "single-file") {
|
|
cat(sprintf(" Existing TIFF dates: %d\n", length(existing_tile_dates)))
|
|
} else {
|
|
cat(sprintf(" Existing tiled dates: %d\n", length(existing_tile_dates)))
|
|
}
|
|
cat(sprintf(" Missing dates in window: %d\n", length(missing_dates)))
|
|
|
|
# Download each missing date
|
|
download_count <- 0
|
|
download_failed <- 0
|
|
|
|
if (length(missing_dates) > 0) {
|
|
# Save current directory
|
|
original_dir <- getwd()
|
|
|
|
# Change to python_app directory so relative paths work correctly
|
|
setwd("python_app")
|
|
|
|
for (date_str in missing_dates) {
|
|
cmd <- sprintf('python 00_download_8band_pu_optimized.py "%s" --date "%s" --resolution 3 --cleanup', project_dir, date_str)
|
|
result <- system(cmd, ignore.stdout = FALSE, ignore.stderr = FALSE)
|
|
if (result == 0) {
|
|
download_count <- download_count + 1
|
|
} else {
|
|
download_failed <- download_failed + 1
|
|
}
|
|
}
|
|
|
|
# Change back to original directory
|
|
setwd(original_dir)
|
|
}
|
|
|
|
cat(sprintf("✓ Downloaded %d dates, %d failed\n", download_count, download_failed))
|
|
if (download_failed > 0) {
|
|
cat("⚠ Some downloads failed, but continuing pipeline\n")
|
|
}
|
|
|
|
# Force Script 10 to run ONLY if downloads actually succeeded (not just attempted)
|
|
if (download_count > 0) {
|
|
skip_10 <- FALSE
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in planet download:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 10: CREATE PER-FIELD TIFFs
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_10) {
|
|
cat("\n========== RUNNING SCRIPT 10: CREATE PER-FIELD TIFFs ==========\n")
|
|
tryCatch(
|
|
{
|
|
# Run Script 10 via system() - NEW per-field version
|
|
# Arguments: project_dir
|
|
cmd <- sprintf(
|
|
'"%s" --vanilla r_app/10_create_per_field_tiffs.R "%s"',
|
|
RSCRIPT_PATH,
|
|
project_dir
|
|
)
|
|
result <- system(cmd)
|
|
|
|
if (result != 0) {
|
|
stop("Script 10 exited with error code:", result)
|
|
}
|
|
|
|
# Verify output - check per-field structure
|
|
field_tiles_dir <- paths$field_tiles_dir
|
|
if (dir.exists(field_tiles_dir)) {
|
|
fields <- list.dirs(field_tiles_dir, full.names = FALSE, recursive = FALSE)
|
|
fields <- fields[fields != ""]
|
|
total_files <- sum(sapply(file.path(field_tiles_dir, fields), function(f) length(list.files(f, pattern = "\\.tif$"))))
|
|
cat(sprintf("✓ Script 10 completed - created per-field TIFFs (%d fields, %d files)\n", length(fields), total_files))
|
|
} else {
|
|
cat("✓ Script 10 completed\n")
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 10:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
} else if (skip_10) {
|
|
cat("\n========== SKIPPING SCRIPT 10 (per-field TIFFs already exist) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 20: CI EXTRACTION
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_20) {
|
|
cat("\n========== RUNNING SCRIPT 20: CI EXTRACTION ==========\n")
|
|
tryCatch(
|
|
{
|
|
# Run Script 20 via system() to pass command-line args just like from terminal
|
|
# Arguments: project_dir end_date offset
|
|
# Use FULL offset so CI extraction covers entire reporting window (not just new data)
|
|
cmd <- sprintf(
|
|
'"%s" --vanilla r_app/20_ci_extraction_per_field.R "%s" "%s" %d',
|
|
RSCRIPT_PATH,
|
|
project_dir, format(end_date, "%Y-%m-%d"), offset
|
|
)
|
|
result <- system(cmd)
|
|
|
|
if (result != 0) {
|
|
stop("Script 20 exited with error code:", result)
|
|
}
|
|
|
|
# Verify CI output was created
|
|
ci_daily_dir <- paths$daily_ci_vals_dir
|
|
if (dir.exists(ci_daily_dir)) {
|
|
files <- list.files(ci_daily_dir, pattern = "\\.rds$")
|
|
cat(sprintf("✓ Script 20 completed - generated %d CI files\n", length(files)))
|
|
} else {
|
|
cat("✓ Script 20 completed\n")
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 20:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
} else if (skip_20) {
|
|
cat("\n========== SKIPPING SCRIPT 20 (CI already extracted) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 21: CONVERT CI RDS TO CSV
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_21) {
|
|
cat("\n========== RUNNING SCRIPT 21: CONVERT CI RDS TO CSV ==========\n")
|
|
tryCatch(
|
|
{
|
|
# Set environment variables for the script
|
|
assign("end_date", end_date, envir = .GlobalEnv)
|
|
assign("offset", offset, envir = .GlobalEnv)
|
|
assign("project_dir", project_dir, envir = .GlobalEnv)
|
|
|
|
source("r_app/21_convert_ci_rds_to_csv.R")
|
|
main() # Call main() to execute the script with the environment variables
|
|
|
|
# Verify CSV output was created
|
|
ci_csv_path <- paths$ci_for_python_dir
|
|
if (dir.exists(ci_csv_path)) {
|
|
csv_files <- list.files(ci_csv_path, pattern = "\\.csv$")
|
|
cat(sprintf("✓ Script 21 completed - converted to %d CSV files\n", length(csv_files)))
|
|
} else {
|
|
cat("✓ Script 21 completed\n")
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 21:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
} else if (skip_21) {
|
|
cat("\n========== SKIPPING SCRIPT 21 (CSV already created) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 30: INTERPOLATE GROWTH MODEL
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_30) {
|
|
cat("\n========== RUNNING SCRIPT 30: INTERPOLATE GROWTH MODEL ==========\n")
|
|
tryCatch(
|
|
{
|
|
# Run Script 30 via system() to pass command-line args just like from terminal
|
|
# Script 30 expects: project_dir only
|
|
# Per-field version reads CI data from Script 20 per-field output location
|
|
cmd <- sprintf(
|
|
'"%s" --vanilla r_app/30_interpolate_growth_model.R "%s"',
|
|
RSCRIPT_PATH,
|
|
project_dir
|
|
)
|
|
result <- system(cmd)
|
|
|
|
if (result != 0) {
|
|
stop("Script 30 exited with error code:", result)
|
|
}
|
|
|
|
# Verify interpolated output
|
|
growth_dir <- paths$growth_model_interpolated_dir
|
|
if (dir.exists(growth_dir)) {
|
|
files <- list.files(growth_dir, pattern = "\\.rds$|\\.csv$")
|
|
cat(sprintf("✓ Script 30 completed - generated %d growth model files\n", length(files)))
|
|
} else {
|
|
cat("✓ Script 30 completed\n")
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 30:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
}
|
|
|
|
# ==============================================================================
|
|
# PYTHON 31: HARVEST IMMINENT WEEKLY
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_31) {
|
|
cat("\n========== RUNNING PYTHON 31: HARVEST IMMINENT WEEKLY ==========\n")
|
|
tryCatch(
|
|
{
|
|
# Run Python script in pytorch_gpu conda environment
|
|
# Script expects positional project name (not --project flag)
|
|
# Run from smartcane root so conda can find the environment
|
|
cmd <- sprintf("conda run -n pytorch_gpu python python_app/31_harvest_imminent_weekly.py %s", project_dir)
|
|
result <- system(cmd)
|
|
|
|
if (result == 0) {
|
|
# Verify harvest output - check for THIS WEEK's specific file
|
|
wwy_current_31 <- get_iso_week_year(end_date)
|
|
expected_file <- file.path(
|
|
"laravel_app", "storage", "app", project_dir, "reports", "kpis", "field_stats",
|
|
sprintf("%s_harvest_imminent_week_%02d_%d.csv", project_dir, wwy_current_31$week, wwy_current_31$year)
|
|
)
|
|
|
|
if (file.exists(expected_file)) {
|
|
cat(sprintf("✓ Script 31 completed - generated harvest imminent file for week %02d\n", wwy_current_31$week))
|
|
} else {
|
|
cat("✓ Script 31 completed (check if harvest.xlsx is available)\n")
|
|
}
|
|
} else {
|
|
cat("⚠ Script 31 completed with errors (check harvest.xlsx availability)\n")
|
|
}
|
|
},
|
|
error = function(e) {
|
|
setwd(original_dir)
|
|
cat("⚠ Script 31 error:", e$message, "\n")
|
|
}
|
|
)
|
|
} else if (skip_31) {
|
|
cat("\n========== SKIPPING SCRIPT 31 (non-cane_supply client type) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 40: MOSAIC CREATION (LOOP THROUGH MISSING WEEKS)
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_40) {
|
|
cat("\n========== RUNNING SCRIPT 40: MOSAIC CREATION ==========\n")
|
|
|
|
# If there are missing weeks, process them one at a time
|
|
if (nrow(missing_weeks) > 0) {
|
|
cat(sprintf("Found %d missing week(s) - running Script 40 once per week\n\n", nrow(missing_weeks)))
|
|
|
|
# Loop through missing weeks in reverse chronological order (oldest first)
|
|
for (week_idx in nrow(missing_weeks):1) {
|
|
missing_week <- missing_weeks[week_idx, ]
|
|
week_num <- missing_week$week
|
|
year_num <- missing_week$year
|
|
week_end_date <- as.Date(missing_week$week_end_date)
|
|
|
|
cat(sprintf(
|
|
"--- Creating mosaic for week %02d/%d (ending %s) ---\n",
|
|
week_num, year_num, format(week_end_date, "%Y-%m-%d")
|
|
))
|
|
|
|
tryCatch(
|
|
{
|
|
# Run Script 40 with offset=7 (one week only) for this specific week
|
|
# The end_date is the last day of the week, and offset=7 covers the full 7-day week
|
|
# Arguments: end_date offset project_dir
|
|
cmd <- sprintf(
|
|
'"%s" --vanilla r_app/40_mosaic_creation_per_field.R "%s" 7 "%s"',
|
|
RSCRIPT_PATH,
|
|
format(week_end_date, "%Y-%m-%d"), project_dir
|
|
)
|
|
result <- system(cmd)
|
|
|
|
if (result != 0) {
|
|
stop("Script 40 exited with error code:", result)
|
|
}
|
|
|
|
# Verify mosaic was created for this specific week
|
|
mosaic_created <- FALSE
|
|
if (mosaic_mode == "tiled") {
|
|
mosaic_dir <- get_mosaic_dir(project_dir, mosaic_mode = "tiled")
|
|
if (dir.exists(mosaic_dir)) {
|
|
week_pattern <- sprintf("week_%02d_%d\\.tif", week_num, year_num)
|
|
mosaic_files <- list.files(mosaic_dir, pattern = week_pattern)
|
|
mosaic_created <- length(mosaic_files) > 0
|
|
}
|
|
} else {
|
|
mosaic_dir <- paths$weekly_mosaic_dir
|
|
if (dir.exists(mosaic_dir)) {
|
|
week_pattern <- sprintf("week_%02d_%d\\.tif", week_num, year_num)
|
|
# NEW: Support per-field architecture - search recursively for mosaics in field subdirectories
|
|
mosaic_files <- list.files(mosaic_dir, pattern = week_pattern, recursive = TRUE, full.names = FALSE)
|
|
mosaic_created <- length(mosaic_files) > 0
|
|
}
|
|
}
|
|
|
|
if (mosaic_created) {
|
|
cat(sprintf("✓ Week %02d/%d mosaic created successfully\n\n", week_num, year_num))
|
|
} else {
|
|
cat(sprintf("✓ Week %02d/%d processing completed (verify output)\n\n", week_num, year_num))
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat(sprintf("✗ Error creating mosaic for week %02d/%d: %s\n", week_num, year_num, e$message), "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
}
|
|
|
|
if (pipeline_success) {
|
|
cat(sprintf("✓ Script 40 completed - created all %d missing week mosaics\n", nrow(missing_weeks)))
|
|
}
|
|
} else {
|
|
cat("No missing weeks detected - skipping Script 40\n")
|
|
skip_40 <- TRUE
|
|
}
|
|
} else if (skip_40) {
|
|
cat("\n========== SKIPPING SCRIPT 40 (mosaics already created) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 80: CALCULATE KPIs (LOOP THROUGH REPORTING WINDOW)
|
|
# ==============================================================================
|
|
if (pipeline_success && !skip_80) {
|
|
cat("\n========== RUNNING SCRIPT 80: CALCULATE KPIs FOR REPORTING WINDOW ==========\n")
|
|
|
|
# Build list of weeks that NEED calculation (missing KPIs)
|
|
weeks_to_calculate <- kpis_needed[!kpis_needed$has_kpis, ] # Only weeks WITHOUT KPIs
|
|
|
|
if (nrow(weeks_to_calculate) > 0) {
|
|
# Sort by date (oldest to newest) for sequential processing
|
|
weeks_to_calculate <- weeks_to_calculate[order(weeks_to_calculate$date), ]
|
|
|
|
cat(sprintf(
|
|
"Looping through %d missing week(s) in reporting window (from %s back to %s):\n\n",
|
|
nrow(weeks_to_calculate),
|
|
format(max(weeks_to_calculate$date), "%Y-%m-%d"),
|
|
format(min(weeks_to_calculate$date), "%Y-%m-%d")
|
|
))
|
|
|
|
tryCatch(
|
|
{
|
|
for (week_idx in 1:nrow(weeks_to_calculate)) {
|
|
week_row <- weeks_to_calculate[week_idx, ]
|
|
calc_date <- week_row$date
|
|
|
|
# Run Script 80 for this specific week with offset=7 (one week only)
|
|
# This ensures Script 80 calculates KPIs for THIS week with proper trend data
|
|
cmd <- sprintf(
|
|
'"%s" --vanilla r_app/80_calculate_kpis.R "%s" "%s" %d',
|
|
RSCRIPT_PATH,
|
|
format(calc_date, "%Y-%m-%d"), project_dir, 7
|
|
) # offset=7 for single week
|
|
|
|
cat(sprintf(
|
|
" [Week %02d/%d] Running Script 80 with end_date=%s...\n",
|
|
week_row$week, week_row$year, format(calc_date, "%Y-%m-%d")
|
|
))
|
|
|
|
result <- system(cmd, ignore.stdout = TRUE, ignore.stderr = TRUE)
|
|
|
|
if (result == 0) {
|
|
cat(sprintf(" ✓ KPIs calculated for week %02d/%d\n", week_row$week, week_row$year))
|
|
} else {
|
|
cat(sprintf(
|
|
" ✗ Error calculating KPIs for week %02d/%d (exit code: %d)\n",
|
|
week_row$week, week_row$year, result
|
|
))
|
|
}
|
|
}
|
|
|
|
# Verify total KPI output (kpi_dir defined by check_kpi_completeness() earlier)
|
|
if (dir.exists(kpi_dir)) {
|
|
files <- list.files(kpi_dir, pattern = "\\.csv$|\\.json$")
|
|
# Extract subdir name from kpi_dir path for display
|
|
subdir_name <- basename(kpi_dir)
|
|
cat(sprintf("\n✓ Script 80 loop completed - total %d KPI files in %s/\n", length(files), subdir_name))
|
|
} else {
|
|
cat("\n✓ Script 80 loop completed\n")
|
|
}
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 80 loop:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
} else {
|
|
cat(sprintf("✓ All %d weeks already have KPIs - skipping calculation\n", nrow(kpis_needed)))
|
|
}
|
|
} else if (skip_80) {
|
|
cat("\n========== SKIPPING SCRIPT 80 (all KPIs already exist) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# VERIFY KPI COMPLETION AFTER SCRIPT 80
|
|
# ==============================================================================
|
|
# Recheck if all KPIs are now available (Script 80 should have calculated any missing ones)
|
|
cat("\n========== VERIFYING KPI COMPLETION ==========\n")
|
|
|
|
kpis_complete <- TRUE
|
|
if (dir.exists(kpi_dir)) {
|
|
for (weeks_back in 0:(reporting_weeks_needed - 1)) {
|
|
check_date <- end_date - (weeks_back * 7)
|
|
week_num <- as.numeric(format(check_date, "%V"))
|
|
year_num <- as.numeric(format(check_date, "%G"))
|
|
|
|
# Check for any KPI file from that week
|
|
week_pattern <- sprintf("week%02d_%d", week_num, year_num)
|
|
# NEW: Support per-field architecture - search recursively for KPI files in field subdirectories
|
|
kpi_files_this_week <- list.files(kpi_dir, pattern = week_pattern, recursive = TRUE, full.names = FALSE)
|
|
|
|
if (length(kpi_files_this_week) == 0) {
|
|
kpis_complete <- FALSE
|
|
cat(sprintf(" Week %02d/%d: ✗ KPIs not found\n", week_num, year_num))
|
|
}
|
|
}
|
|
}
|
|
|
|
if (kpis_complete) {
|
|
cat("✓ All KPIs available - reports can be generated\n")
|
|
} else {
|
|
cat("⚠ Some KPIs still missing - reports will be skipped\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 90: LEGACY WORD REPORT (agronomic_support clients)
|
|
# ==============================================================================
|
|
if (pipeline_success && run_legacy_report) {
|
|
cat("\n========== RUNNING SCRIPT 90: LEGACY WORD REPORT ==========\n")
|
|
|
|
if (!kpis_complete) {
|
|
cat("⚠ Skipping Script 90 - KPIs not available for full reporting window\n")
|
|
} else {
|
|
tryCatch(
|
|
{
|
|
# Script 90 is an RMarkdown file - compile it with rmarkdown::render()
|
|
output_dir <- paths$reports_dir
|
|
|
|
# Reports directory already created by setup_project_directories
|
|
|
|
output_filename <- sprintf(
|
|
"CI_report_week%02d_%d.docx",
|
|
as.numeric(format(end_date, "%V")),
|
|
as.numeric(format(end_date, "%G"))
|
|
)
|
|
|
|
# Render the RMarkdown document
|
|
rmarkdown::render(
|
|
input = "r_app/90_CI_report_with_kpis_simple.Rmd",
|
|
output_dir = output_dir,
|
|
output_file = output_filename,
|
|
params = list(
|
|
report_date = format(end_date, "%Y-%m-%d"),
|
|
data_dir = project_dir
|
|
),
|
|
quiet = TRUE
|
|
)
|
|
|
|
cat(sprintf("✓ Script 90 completed - generated Word report: %s\n", output_filename))
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 90:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
}
|
|
} else if (run_legacy_report) {
|
|
cat("\n========== SKIPPING SCRIPT 90 (pipeline error or KPIs incomplete) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SCRIPT 91: MODERN WORD REPORT (cane_supply clients)
|
|
# ==============================================================================
|
|
if (pipeline_success && run_modern_report) {
|
|
cat("\n========== RUNNING SCRIPT 91: MODERN WORD REPORT ==========\n")
|
|
|
|
if (!kpis_complete) {
|
|
cat("⚠ Skipping Script 91 - KPIs not available for full reporting window\n")
|
|
} else {
|
|
tryCatch(
|
|
{
|
|
# Script 91 is an RMarkdown file - compile it with rmarkdown::render()
|
|
output_dir <- paths$reports_dir
|
|
|
|
# Reports directory already created by setup_project_directories
|
|
|
|
output_filename <- sprintf(
|
|
"CI_report_week%02d_%d.docx",
|
|
as.numeric(format(end_date, "%V")),
|
|
as.numeric(format(end_date, "%G"))
|
|
)
|
|
|
|
# Render the RMarkdown document
|
|
rmarkdown::render(
|
|
input = "r_app/91_CI_report_with_kpis_Angata.Rmd",
|
|
output_dir = output_dir,
|
|
output_file = output_filename,
|
|
params = list(
|
|
report_date = format(end_date, "%Y-%m-%d"),
|
|
data_dir = project_dir
|
|
),
|
|
quiet = TRUE
|
|
)
|
|
|
|
cat(sprintf("✓ Script 91 completed - generated Word report: %s\n", output_filename))
|
|
},
|
|
error = function(e) {
|
|
cat("✗ Error in Script 91:", e$message, "\n")
|
|
pipeline_success <<- FALSE
|
|
}
|
|
)
|
|
}
|
|
} else if (run_modern_report) {
|
|
cat("\n========== SKIPPING SCRIPT 91 (pipeline error or KPIs incomplete) ==========\n")
|
|
}
|
|
|
|
# ==============================================================================
|
|
# SUMMARY
|
|
# ==============================================================================
|
|
cat("\n========== PIPELINE COMPLETE ==========\n")
|
|
cat(sprintf("Project: %s\n", project_dir))
|
|
cat(sprintf("End Date: %s\n", end_date_str))
|
|
cat(sprintf("Offset: %d days\n", offset))
|
|
if (pipeline_success) {
|
|
cat("Status: ✓ All scripts completed successfully\n")
|
|
} else {
|
|
cat("Status: ✗ Pipeline failed - check errors above\n")
|
|
}
|
|
cat("Pipeline sequence: Python Download → R 10 → R 20 → R 21 → R 30 → Python 31 → R 40 → R 80 → R 90/91\n")
|