946 lines
33 KiB
R
946 lines
33 KiB
R
# 09b_FIELD_ANALYSIS_WEEKLY.R (NEW - TILE-AWARE WITH PARALLEL PROCESSING)
|
|
# ============================================================================
|
|
# Per-field weekly analysis with tile-based mosaic extraction and parallel processing
|
|
#
|
|
# MAJOR IMPROVEMENTS OVER 09_field_analysis_weekly.R:
|
|
# - Tile-aware: Loads only relevant tiles for each field (memory efficient)
|
|
# - Parallel processing: Uses furrr for parallel field extraction (1300+ fields supported)
|
|
# - Field-based cloud analysis: Cloud coverage calculated per-field from tiles
|
|
# - Scalable: Architecture ready for 13,000+ fields
|
|
#
|
|
# Generates detailed field-level CSV export with:
|
|
# - Field identifiers and areas
|
|
# - Weekly CI change (mean ± std) from tile-based extraction
|
|
# - Age-based phase assignment (Germination, Tillering, Grand Growth, Maturation)
|
|
# - Harvest imminence detection (Phase 1 from LSTM model) - optional
|
|
# - Status triggers (non-exclusive, can coexist with harvest imminent phase)
|
|
# - Phase transition tracking (weeks in current phase)
|
|
# - Cloud coverage analysis from tiles (per-field, not mosaic-wide)
|
|
#
|
|
# Cloud Coverage Per-Field:
|
|
# - Extracts from relevant tiles that field intersects
|
|
# - Categories: Clear view (>=99.5%), Partial coverage (0-99.5%), No image available (0%)
|
|
# - Reports % of fields by cloud category
|
|
#
|
|
# Parallel Processing:
|
|
# - Uses furrr::future_map_df() for CPU-parallel field extraction
|
|
# - Configure workers before running: future::plan(future::multisession, workers = N)
|
|
# - Each worker: loads tile(s), extracts CI, calculates stats
|
|
# - Significant speedup for 1000+ fields
|
|
#
|
|
# Output:
|
|
# - Excel (.xlsx) with Field Data sheet and Summary sheet
|
|
# - Excel (.xlsx) weekly harvest predictions for tracking
|
|
# - RDS file with field_analysis and field_analysis_summary for Rmd reports
|
|
# - Summary includes: Monitored area, Cloud coverage, Phase distribution, Status triggers
|
|
#
|
|
# Usage: Rscript 09b_field_analysis_weekly.R [end_date] [project_dir]
|
|
# - end_date: End date for analysis (YYYY-MM-DD format), default: today
|
|
# - project_dir: Project directory name (e.g., "aura", "esa", "angata")
|
|
#
|
|
# Example:
|
|
# Rscript 09b_field_analysis_weekly.R 2026-01-08 angata
|
|
#
|
|
|
|
|
|
|
|
# 1. Load required libraries
|
|
suppressPackageStartupMessages({
|
|
library(here)
|
|
library(sf)
|
|
library(terra)
|
|
library(dplyr)
|
|
library(tidyr)
|
|
library(lubridate)
|
|
library(readr)
|
|
library(readxl)
|
|
library(writexl)
|
|
library(purrr)
|
|
library(furrr)
|
|
library(future)
|
|
# Optional: torch for harvest model inference (will skip if not available)
|
|
tryCatch({
|
|
library(torch)
|
|
}, error = function(e) {
|
|
message("Note: torch package not available - harvest model inference will be skipped")
|
|
})
|
|
})
|
|
|
|
# ============================================================================
|
|
# PHASE AND STATUS TRIGGER DEFINITIONS
|
|
# ============================================================================
|
|
|
|
PHASE_DEFINITIONS <- data.frame(
|
|
phase = c("Germination", "Tillering", "Grand Growth", "Maturation"),
|
|
age_start = c(0, 4, 17, 39),
|
|
age_end = c(6, 16, 39, 200),
|
|
stringsAsFactors = FALSE
|
|
)
|
|
|
|
STATUS_TRIGGERS <- data.frame(
|
|
trigger = c(
|
|
"germination_started",
|
|
"germination_complete",
|
|
"stress_detected_whole_field",
|
|
"strong_recovery",
|
|
"growth_on_track",
|
|
"maturation_progressing",
|
|
"harvest_ready"
|
|
),
|
|
age_min = c(0, 0, NA, NA, 4, 39, 45),
|
|
age_max = c(6, 6, NA, NA, 39, 200, 200),
|
|
description = c(
|
|
"10% of field CI > 2",
|
|
"70% of field CI >= 2",
|
|
"CI decline > -1.5 + low CV",
|
|
"CI increase > +1.5",
|
|
"CI increasing consistently",
|
|
"High CI, stable/declining",
|
|
"Age 45+ weeks (ready to harvest)"
|
|
),
|
|
stringsAsFactors = FALSE
|
|
)
|
|
|
|
# ============================================================================
|
|
# TILE-AWARE HELPER FUNCTIONS
|
|
# ============================================================================
|
|
|
|
#' Get tile IDs that a field geometry intersects
|
|
#'
|
|
#' @param field_geom Single field geometry (sf or terra::vect)
|
|
#' @param tile_grid Data frame with tile extents (id, xmin, xmax, ymin, ymax)
|
|
#' @return Numeric vector of tile IDs that field intersects
|
|
#'
|
|
get_tile_ids_for_field <- function(field_geom, tile_grid) {
|
|
# Convert field to bounding box
|
|
if (inherits(field_geom, "sf")) {
|
|
field_bbox <- sf::st_bbox(field_geom)
|
|
field_xmin <- field_bbox["xmin"]
|
|
field_xmax <- field_bbox["xmax"]
|
|
field_ymin <- field_bbox["ymin"]
|
|
field_ymax <- field_bbox["ymax"]
|
|
} else if (inherits(field_geom, "SpatVector")) {
|
|
field_bbox <- terra::ext(field_geom)
|
|
field_xmin <- field_bbox$xmin
|
|
field_xmax <- field_bbox$xmax
|
|
field_ymin <- field_bbox$ymin
|
|
field_ymax <- field_bbox$ymax
|
|
} else {
|
|
stop("field_geom must be sf or terra::vect object")
|
|
}
|
|
|
|
# Check intersection with each tile extent
|
|
intersecting_tiles <- tile_grid$id[
|
|
!(tile_grid$xmax < field_xmin |
|
|
tile_grid$xmin > field_xmax |
|
|
tile_grid$ymax < field_ymin |
|
|
tile_grid$ymin > field_ymax)
|
|
]
|
|
|
|
return(as.numeric(intersecting_tiles))
|
|
}
|
|
|
|
#' Load CI tiles that a field intersects
|
|
#'
|
|
#' @param field_geom Single field geometry
|
|
#' @param tile_ids Numeric vector of tile IDs to load
|
|
#' @param week_num Week number
|
|
#' @param year Year
|
|
#' @param mosaic_dir Directory with weekly tiles
|
|
#' @return Single CI raster (merged if multiple tiles, or single tile)
|
|
#'
|
|
load_tiles_for_field <- function(field_geom, tile_ids, week_num, year, mosaic_dir) {
|
|
if (length(tile_ids) == 0) {
|
|
return(NULL)
|
|
}
|
|
|
|
# Load relevant tiles
|
|
tiles_list <- list()
|
|
for (tile_id in sort(tile_ids)) {
|
|
tile_filename <- sprintf("week_%02d_%d_%02d.tif", week_num, year, tile_id)
|
|
tile_path <- file.path(mosaic_dir, tile_filename)
|
|
|
|
if (file.exists(tile_path)) {
|
|
tryCatch({
|
|
tile_rast <- terra::rast(tile_path)
|
|
# Extract CI band (band 5 or named "CI")
|
|
if ("CI" %in% names(tile_rast)) {
|
|
ci_band <- tile_rast[["CI"]]
|
|
} else if (terra::nlyr(tile_rast) >= 5) {
|
|
ci_band <- tile_rast[[5]]
|
|
} else {
|
|
ci_band <- tile_rast[[1]]
|
|
}
|
|
names(ci_band) <- "CI"
|
|
tiles_list[[length(tiles_list) + 1]] <- ci_band
|
|
}, error = function(e) {
|
|
message(paste(" Warning: Could not load tile", tile_id, ":", e$message))
|
|
})
|
|
}
|
|
}
|
|
|
|
if (length(tiles_list) == 0) {
|
|
return(NULL)
|
|
}
|
|
|
|
# If multiple tiles, merge them; otherwise return single tile
|
|
if (length(tiles_list) == 1) {
|
|
return(tiles_list[[1]])
|
|
} else {
|
|
tryCatch({
|
|
rsrc <- terra::sprc(tiles_list)
|
|
merged <- terra::mosaic(rsrc, fun = "max")
|
|
return(merged)
|
|
}, error = function(e) {
|
|
message(paste(" Warning: Could not merge tiles:", e$message))
|
|
return(tiles_list[[1]]) # Fallback to first tile
|
|
})
|
|
}
|
|
}
|
|
|
|
#' Build tile grid from available weekly tile files
|
|
#'
|
|
#' @param mosaic_dir Directory with weekly tiles
|
|
#' @param week_num Week number to discover tiles
|
|
#' @param year Year to discover tiles
|
|
#' @return Data frame with columns: id, xmin, xmax, ymin, ymax
|
|
#'
|
|
build_tile_grid <- function(mosaic_dir, week_num, year) {
|
|
# Find all tiles for this week/year
|
|
tile_pattern <- sprintf("week_%02d_%d_([0-9]{2})\\.tif", week_num, year)
|
|
tile_files <- list.files(mosaic_dir, pattern = tile_pattern, full.names = TRUE)
|
|
|
|
if (length(tile_files) == 0) {
|
|
stop(paste("No tile files found for week", week_num, year, "in", mosaic_dir))
|
|
}
|
|
|
|
# Extract extents from each tile
|
|
tile_grid <- data.frame(
|
|
id = integer(),
|
|
xmin = numeric(),
|
|
xmax = numeric(),
|
|
ymin = numeric(),
|
|
ymax = numeric(),
|
|
stringsAsFactors = FALSE
|
|
)
|
|
|
|
for (tile_file in tile_files) {
|
|
tryCatch({
|
|
# Extract tile ID from filename
|
|
matches <- regmatches(basename(tile_file), regexpr("_([0-9]{2})\\.tif$", basename(tile_file)))
|
|
if (length(matches) > 0) {
|
|
tile_id <- as.integer(gsub("[^0-9]", "", matches))
|
|
|
|
# Load raster and get extent
|
|
tile_rast <- terra::rast(tile_file)
|
|
ext <- terra::ext(tile_rast)
|
|
|
|
tile_grid <- rbind(tile_grid, data.frame(
|
|
id = tile_id,
|
|
xmin = ext$xmin,
|
|
xmax = ext$xmax,
|
|
ymin = ext$ymin,
|
|
ymax = ext$ymax,
|
|
stringsAsFactors = FALSE
|
|
))
|
|
}
|
|
}, error = function(e) {
|
|
message(paste(" Warning: Could not process tile", basename(tile_file), ":", e$message))
|
|
})
|
|
}
|
|
|
|
if (nrow(tile_grid) == 0) {
|
|
stop("Could not extract extents from any tile files")
|
|
}
|
|
|
|
return(tile_grid)
|
|
}
|
|
|
|
# ============================================================================
|
|
# HELPER FUNCTIONS (FROM ORIGINAL 09)
|
|
# ============================================================================
|
|
|
|
get_phase_by_age <- function(age_weeks) {
|
|
if (is.na(age_weeks)) return(NA_character_)
|
|
for (i in seq_len(nrow(PHASE_DEFINITIONS))) {
|
|
if (age_weeks >= PHASE_DEFINITIONS$age_start[i] &&
|
|
age_weeks <= PHASE_DEFINITIONS$age_end[i]) {
|
|
return(PHASE_DEFINITIONS$phase[i])
|
|
}
|
|
}
|
|
return("Unknown")
|
|
}
|
|
|
|
get_status_trigger <- function(ci_values, ci_change, age_weeks) {
|
|
if (is.na(age_weeks) || length(ci_values) == 0) return(NA_character_)
|
|
|
|
ci_values <- ci_values[!is.na(ci_values)]
|
|
if (length(ci_values) == 0) return(NA_character_)
|
|
|
|
pct_above_2 <- sum(ci_values > 2) / length(ci_values) * 100
|
|
pct_at_or_above_2 <- sum(ci_values >= 2) / length(ci_values) * 100
|
|
ci_cv <- if (mean(ci_values, na.rm = TRUE) > 0) sd(ci_values) / mean(ci_values, na.rm = TRUE) else 0
|
|
mean_ci <- mean(ci_values, na.rm = TRUE)
|
|
|
|
# Germination phase triggers (age 0-6)
|
|
if (age_weeks >= 0 && age_weeks <= 6) {
|
|
if (pct_at_or_above_2 >= 70) {
|
|
return("germination_complete")
|
|
} else if (pct_above_2 > 10) {
|
|
return("germination_started")
|
|
}
|
|
}
|
|
|
|
# Harvest ready (45+ weeks) - check first to prioritize
|
|
if (age_weeks >= 45) {
|
|
return("harvest_ready")
|
|
}
|
|
|
|
# Stress detection (any phase except Germination)
|
|
if (age_weeks > 6 && !is.na(ci_change) && ci_change < -1.5 && ci_cv < 0.25) {
|
|
return("stress_detected_whole_field")
|
|
}
|
|
|
|
# Strong recovery (any phase except Germination)
|
|
if (age_weeks > 6 && !is.na(ci_change) && ci_change > 1.5) {
|
|
return("strong_recovery")
|
|
}
|
|
|
|
# Growth on track (Tillering/Grand Growth, 4-39 weeks)
|
|
if (age_weeks >= 4 && age_weeks < 39 && !is.na(ci_change) && ci_change > 0.2) {
|
|
return("growth_on_track")
|
|
}
|
|
|
|
# Maturation progressing (39-45 weeks, high CI stable/declining)
|
|
if (age_weeks >= 39 && age_weeks < 45 && mean_ci > 3.5) {
|
|
return("maturation_progressing")
|
|
}
|
|
|
|
return(NA_character_)
|
|
}
|
|
|
|
load_previous_week_csv <- function(project_dir, current_week, reports_dir) {
|
|
lookback_weeks <- c(1, 2, 3)
|
|
|
|
for (lookback in lookback_weeks) {
|
|
previous_week <- current_week - lookback
|
|
if (previous_week < 1) previous_week <- previous_week + 52
|
|
|
|
csv_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", previous_week), ".csv")
|
|
csv_path <- file.path(reports_dir, "kpis", "field_analysis", csv_filename)
|
|
|
|
if (file.exists(csv_path)) {
|
|
tryCatch({
|
|
data <- read_csv(csv_path, show_col_types = FALSE)
|
|
return(list(data = data, weeks_lookback = lookback, found = TRUE))
|
|
}, error = function(e) {
|
|
message(paste("Warning: Could not load", basename(csv_path), ":", e$message))
|
|
})
|
|
}
|
|
}
|
|
|
|
message("No previous field analysis CSV found. Phase tracking will be age-based only.")
|
|
return(list(data = NULL, weeks_lookback = NA, found = FALSE))
|
|
}
|
|
|
|
USE_UNIFORM_AGE <- TRUE
|
|
UNIFORM_PLANTING_DATE <- as.Date("2025-01-01")
|
|
|
|
extract_planting_dates <- function(harvesting_data) {
|
|
if (USE_UNIFORM_AGE) {
|
|
message(paste("Using uniform planting date for all fields:", UNIFORM_PLANTING_DATE))
|
|
return(data.frame(
|
|
field_id = character(),
|
|
planting_date = as.Date(character()),
|
|
stringsAsFactors = FALSE
|
|
))
|
|
}
|
|
|
|
if (is.null(harvesting_data) || nrow(harvesting_data) == 0) {
|
|
message("Warning: No harvesting data available.")
|
|
return(NULL)
|
|
}
|
|
|
|
tryCatch({
|
|
planting_dates <- harvesting_data %>%
|
|
arrange(field, desc(season_start)) %>%
|
|
distinct(field, .keep_all = TRUE) %>%
|
|
select(field, season_start) %>%
|
|
rename(field_id = field, planting_date = season_start) %>%
|
|
filter(!is.na(planting_date)) %>%
|
|
as.data.frame()
|
|
|
|
message(paste("Extracted planting dates for", nrow(planting_dates), "fields (most recent season)"))
|
|
return(planting_dates)
|
|
}, error = function(e) {
|
|
message(paste("Error extracting planting dates:", e$message))
|
|
return(NULL)
|
|
})
|
|
}
|
|
|
|
# ============================================================================
|
|
# NOTE: Cloud coverage is now calculated inline in analyze_single_field()
|
|
# ============================================================================
|
|
# Cloud coverage logic (per-field, from same CI extraction):
|
|
# - Extract ALL pixels from field polygon (including NAs from clouds/missing data)
|
|
# - Count: num_data = non-NA pixels, num_total = total pixels in field
|
|
# - Calculate: pct_clear = (num_data / num_total) * 100
|
|
# - Categorize: >=99.5% = "Clear view", >0% = "Partial coverage", 0% = "No image available"
|
|
#
|
|
# This ensures LOGICAL CONSISTENCY:
|
|
# - If CI_mean has value → at least 1 pixel has data → pct_clear > 0 ✓
|
|
# - If pct_clear = 0 → no data → CI_mean = NA ✓
|
|
# - Eliminates double-extraction inefficiency
|
|
|
|
# ============================================================================
|
|
# PARALLEL FIELD ANALYSIS FUNCTION
|
|
# ============================================================================
|
|
|
|
#' Analyze single field (for parallel processing)
|
|
#'
|
|
#' This function processes ONE field at a time and is designed to run in parallel
|
|
#' Each call: loads relevant tiles, extracts CI, calculates statistics
|
|
#'
|
|
#' @param field_idx Index in field_boundaries_sf
|
|
#' @param field_boundaries_sf All field boundaries (sf object)
|
|
#' @param current_ci_rasters List of currently loaded CI rasters (by tile_id)
|
|
#' @param previous_ci_rasters List of previously loaded CI rasters (by tile_id)
|
|
#' @param tile_grid Data frame with tile extents
|
|
#' @param week_num Current week number
|
|
#' @param year Current year
|
|
#' @param mosaic_dir Directory with weekly tiles
|
|
#' @param previous_week_csv Previous week's CSV data
|
|
#' @param planting_dates Planting dates lookup
|
|
#' @param report_date Report date
|
|
#' @param harvest_imminence_data Harvest imminence predictions (optional)
|
|
#'
|
|
#' @return Single-row data frame with field analysis
|
|
#'
|
|
analyze_single_field <- function(field_idx, field_boundaries_sf, tile_grid, week_num, year,
|
|
mosaic_dir, previous_week_csv = NULL, planting_dates = NULL,
|
|
report_date = Sys.Date(), harvest_imminence_data = NULL) {
|
|
|
|
tryCatch({
|
|
# Get field info
|
|
field_id <- field_boundaries_sf$field[field_idx]
|
|
farm_section <- if ("sub_area" %in% names(field_boundaries_sf)) {
|
|
field_boundaries_sf$sub_area[field_idx]
|
|
} else {
|
|
NA_character_
|
|
}
|
|
field_name <- field_id
|
|
|
|
# Get field geometry and validate
|
|
field_sf <- field_boundaries_sf[field_idx, ]
|
|
if (sf::st_is_empty(field_sf) || any(is.na(sf::st_geometry(field_sf)))) {
|
|
return(data.frame(
|
|
Field_id = field_id,
|
|
Farm_section = farm_section,
|
|
CI_mean = NA_real_,
|
|
error = "Empty or invalid geometry"
|
|
))
|
|
}
|
|
|
|
# Calculate field area
|
|
field_area_ha <- as.numeric(sf::st_area(field_sf)) / 10000
|
|
field_area_acres <- field_area_ha / 0.404686
|
|
|
|
# Determine which tiles this field intersects
|
|
tile_ids <- get_tile_ids_for_field(field_sf, tile_grid)
|
|
|
|
# Load current CI tiles for this field
|
|
current_ci <- load_tiles_for_field(field_sf, tile_ids, week_num, year, mosaic_dir)
|
|
|
|
if (is.null(current_ci)) {
|
|
return(data.frame(
|
|
Field_id = field_id,
|
|
Farm_section = farm_section,
|
|
Hectares = field_area_ha,
|
|
Acreage = field_area_acres,
|
|
CI_mean = NA_real_,
|
|
error = "No tile data available"
|
|
))
|
|
}
|
|
|
|
# Extract CI values for current field (keep ALL pixels including NAs for cloud calculation)
|
|
field_vect <- terra::vect(sf::as_Spatial(field_sf))
|
|
terra::crs(field_vect) <- terra::crs(current_ci)
|
|
|
|
all_extracted <- terra::extract(current_ci, field_vect)[, 2] # ALL pixels (including NAs)
|
|
current_ci_vals <- all_extracted[!is.na(all_extracted)] # Only non-NA for CI analysis
|
|
|
|
# Calculate cloud coverage from SAME extraction (no double-extraction)
|
|
# Logic: count non-NA pixels vs total pixels in field
|
|
num_total <- length(all_extracted)
|
|
num_data <- sum(!is.na(all_extracted))
|
|
pct_clear <- if (num_total > 0) round((num_data / num_total) * 100, 1) else 0 # 0 = no data
|
|
|
|
# Categorize cloud coverage - check for no data first
|
|
cloud_cat <- if (num_data == 0) "No image available" # No data at all (100% cloud)
|
|
else if (pct_clear >= 99.5) "Clear view" # 99.5%+ data
|
|
else "Partial coverage" # Some data but with gaps
|
|
cloud_pct <- 100 - pct_clear # Cloud percentage (inverse of clear percentage)
|
|
|
|
# If no CI values extracted, return early with cloud info
|
|
if (length(current_ci_vals) == 0) {
|
|
return(data.frame(
|
|
Field_id = field_id,
|
|
Farm_section = farm_section,
|
|
Hectares = field_area_ha,
|
|
Acreage = field_area_acres,
|
|
CI_mean = NA_real_,
|
|
Cloud_pct = cloud_pct,
|
|
Cloud_category = cloud_cat,
|
|
error = "No CI values extracted"
|
|
))
|
|
}
|
|
|
|
# Calculate current CI statistics
|
|
mean_ci_current <- mean(current_ci_vals, na.rm = TRUE)
|
|
ci_std <- sd(current_ci_vals, na.rm = TRUE)
|
|
cv_current <- ci_std / mean_ci_current
|
|
range_min <- min(current_ci_vals, na.rm = TRUE)
|
|
range_max <- max(current_ci_vals, na.rm = TRUE)
|
|
range_str <- sprintf("%.1f-%.1f", range_min, range_max)
|
|
|
|
# Calculate weekly CI change (compare with previous week if available)
|
|
weekly_ci_change <- NA
|
|
previous_ci_vals <- NULL
|
|
|
|
# Try to load previous week tiles for this field
|
|
tryCatch({
|
|
previous_ci <- load_tiles_for_field(field_sf, tile_ids, week_num - 1, year, mosaic_dir)
|
|
if (!is.null(previous_ci)) {
|
|
previous_ci_vals <- terra::extract(previous_ci, field_vect)[, 2]
|
|
previous_ci_vals <- previous_ci_vals[!is.na(previous_ci_vals)]
|
|
if (length(previous_ci_vals) > 0) {
|
|
mean_ci_previous <- mean(previous_ci_vals, na.rm = TRUE)
|
|
weekly_ci_change <- mean_ci_current - mean_ci_previous
|
|
}
|
|
}
|
|
}, error = function(e) {
|
|
# Silent fail - previous week data not available is acceptable
|
|
})
|
|
|
|
# Format CI change
|
|
if (is.na(weekly_ci_change)) {
|
|
weekly_ci_change_str <- sprintf("%.1f ± %.2f", mean_ci_current, ci_std)
|
|
} else {
|
|
weekly_ci_change_str <- sprintf("%.1f ± %.2f (Δ %.2f)", mean_ci_current, ci_std, weekly_ci_change)
|
|
}
|
|
|
|
# Calculate age
|
|
age_weeks <- NA
|
|
if (!is.null(planting_dates) && nrow(planting_dates) > 0) {
|
|
planting_row <- which(planting_dates$field_id == field_id)
|
|
if (length(planting_row) > 0) {
|
|
planting_date <- planting_dates$planting_date[planting_row[1]]
|
|
if (!is.na(planting_date)) {
|
|
age_weeks <- as.numeric(difftime(report_date, planting_date, units = "weeks"))
|
|
}
|
|
}
|
|
}
|
|
|
|
# If using uniform age
|
|
if (USE_UNIFORM_AGE) {
|
|
age_weeks <- as.numeric(difftime(report_date, UNIFORM_PLANTING_DATE, units = "weeks"))
|
|
}
|
|
|
|
# Calculate germination progress
|
|
pct_ci_above_2 <- sum(current_ci_vals > 2) / length(current_ci_vals) * 100
|
|
pct_ci_ge_2 <- sum(current_ci_vals >= 2) / length(current_ci_vals) * 100
|
|
germination_progress_str <- NA_character_
|
|
if (!is.na(age_weeks) && age_weeks >= 0 && age_weeks <= 6) {
|
|
germination_progress_str <- sprintf("%.0f%% at CI >= 2", pct_ci_ge_2)
|
|
}
|
|
|
|
# Assign phase and trigger
|
|
phase <- "Unknown"
|
|
imminent_prob_val <- NA
|
|
if (!is.null(harvest_imminence_data) && nrow(harvest_imminence_data) > 0) {
|
|
imminent_row <- which(harvest_imminence_data$field_id == field_id)
|
|
if (length(imminent_row) > 0) {
|
|
imminent_prob_val <- harvest_imminence_data$imminent_prob[imminent_row[1]]
|
|
if (!is.na(imminent_prob_val) && imminent_prob_val > 0.5) {
|
|
phase <- "Harvest Imminent"
|
|
}
|
|
}
|
|
}
|
|
|
|
# If not harvest imminent, use age-based phase
|
|
if (phase == "Unknown") {
|
|
phase <- get_phase_by_age(age_weeks)
|
|
}
|
|
|
|
status_trigger <- get_status_trigger(current_ci_vals, weekly_ci_change, age_weeks)
|
|
|
|
# Track phase transitions
|
|
nmr_weeks_in_phase <- 1
|
|
if (!is.null(previous_week_csv) && nrow(previous_week_csv) > 0) {
|
|
prev_row <- which(previous_week_csv$Field_id == field_id)
|
|
if (length(prev_row) > 0) {
|
|
prev_phase <- previous_week_csv$`Phase (age based)`[prev_row[1]]
|
|
if (!is.na(prev_phase) && prev_phase == phase) {
|
|
prev_weeks <- as.numeric(previous_week_csv$Weeks_in_phase[prev_row[1]])
|
|
nmr_weeks_in_phase <- if (is.na(prev_weeks)) 1 else prev_weeks + 1
|
|
}
|
|
}
|
|
}
|
|
|
|
# Compile result
|
|
result <- data.frame(
|
|
Field_id = field_id,
|
|
Farm_section = farm_section,
|
|
Hectares = field_area_ha,
|
|
Acreage = field_area_acres,
|
|
CI_mean = mean_ci_current,
|
|
CI_std = ci_std,
|
|
CI_range = range_str,
|
|
CI_change_weekly = weekly_ci_change_str,
|
|
CI_change_value = weekly_ci_change,
|
|
CV = cv_current,
|
|
Age_week = age_weeks,
|
|
`Phase (age based)` = phase,
|
|
Germination_progress = germination_progress_str,
|
|
Status_trigger = status_trigger,
|
|
Weeks_in_phase = nmr_weeks_in_phase,
|
|
Imminent_prob = imminent_prob_val,
|
|
Cloud_pct = cloud_pct,
|
|
Cloud_category = cloud_cat,
|
|
stringsAsFactors = FALSE
|
|
)
|
|
|
|
return(result)
|
|
|
|
}, error = function(e) {
|
|
message(paste("Error analyzing field", field_idx, ":", e$message))
|
|
return(data.frame(
|
|
Field_id = as.character(field_idx),
|
|
error = e$message
|
|
))
|
|
})
|
|
}
|
|
|
|
# ============================================================================
|
|
# SUMMARY GENERATION
|
|
# ============================================================================
|
|
|
|
generate_field_analysis_summary <- function(field_df) {
|
|
message("Generating summary statistics...")
|
|
|
|
# Total acreage (needed for all percentages)
|
|
total_acreage <- sum(field_df$Acreage, na.rm = TRUE)
|
|
|
|
# Phase breakdown
|
|
germination_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Germination"], na.rm = TRUE)
|
|
tillering_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Tillering"], na.rm = TRUE)
|
|
grand_growth_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Grand Growth"], na.rm = TRUE)
|
|
maturation_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Maturation"], na.rm = TRUE)
|
|
unknown_phase_acreage <- sum(field_df$Acreage[field_df$`Phase (age based)` == "Unknown"], na.rm = TRUE)
|
|
|
|
# Status trigger breakdown
|
|
harvest_ready_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "harvest_ready"], na.rm = TRUE)
|
|
stress_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "stress_detected_whole_field"], na.rm = TRUE)
|
|
recovery_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "strong_recovery"], na.rm = TRUE)
|
|
growth_on_track_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "growth_on_track"], na.rm = TRUE)
|
|
germination_complete_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "germination_complete"], na.rm = TRUE)
|
|
germination_started_acreage <- sum(field_df$Acreage[field_df$Status_trigger == "germination_started"], na.rm = TRUE)
|
|
no_trigger_acreage <- sum(field_df$Acreage[is.na(field_df$Status_trigger)], na.rm = TRUE)
|
|
|
|
# Cloud coverage breakdown - COUNT FIELDS, not acreage for cloud analysis
|
|
clear_fields <- sum(field_df$Cloud_category == "Clear view", na.rm = TRUE)
|
|
partial_fields <- sum(field_df$Cloud_category == "Partial coverage", na.rm = TRUE)
|
|
no_image_fields <- sum(field_df$Cloud_category == "No image available", na.rm = TRUE)
|
|
total_fields <- nrow(field_df)
|
|
|
|
# Cloud acreage for reporting
|
|
clear_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "Clear view"], na.rm = TRUE)
|
|
partial_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "Partial coverage"], na.rm = TRUE)
|
|
no_image_acreage <- sum(field_df$Acreage[field_df$Cloud_category == "No image available"], na.rm = TRUE)
|
|
|
|
# Create summary table
|
|
summary_df <- data.frame(
|
|
Category = c(
|
|
"=== PHASE DISTRIBUTION ===",
|
|
"Germination",
|
|
"Tillering",
|
|
"Grand Growth",
|
|
"Maturation",
|
|
"Unknown Phase",
|
|
"",
|
|
"=== STATUS TRIGGERS ===",
|
|
"Harvest Ready",
|
|
"Stress Detected",
|
|
"Strong Recovery",
|
|
"Growth On Track",
|
|
"Germination Complete",
|
|
"Germination Started",
|
|
"No Trigger",
|
|
"",
|
|
"=== CLOUD COVERAGE ===",
|
|
"Clear View (fields)",
|
|
"Partial Coverage (fields)",
|
|
"No Image Available (fields)",
|
|
"Clear View (acres)",
|
|
"Partial Coverage (acres)",
|
|
"No Image Available (acres)",
|
|
"",
|
|
"Total Fields",
|
|
"Total Acreage"
|
|
),
|
|
Acreage = c(
|
|
NA,
|
|
round(germination_acreage, 2),
|
|
round(tillering_acreage, 2),
|
|
round(grand_growth_acreage, 2),
|
|
round(maturation_acreage, 2),
|
|
round(unknown_phase_acreage, 2),
|
|
NA,
|
|
NA,
|
|
round(harvest_ready_acreage, 2),
|
|
round(stress_acreage, 2),
|
|
round(recovery_acreage, 2),
|
|
round(growth_on_track_acreage, 2),
|
|
round(germination_complete_acreage, 2),
|
|
round(germination_started_acreage, 2),
|
|
round(no_trigger_acreage, 2),
|
|
NA,
|
|
NA,
|
|
clear_fields,
|
|
partial_fields,
|
|
no_image_fields,
|
|
round(clear_acreage, 2),
|
|
round(partial_acreage, 2),
|
|
round(no_image_acreage, 2),
|
|
NA,
|
|
total_fields,
|
|
round(total_acreage, 2)
|
|
),
|
|
stringsAsFactors = FALSE
|
|
)
|
|
|
|
# Add metadata as attributes
|
|
attr(summary_df, "cloud_fields_clear") <- clear_fields
|
|
attr(summary_df, "cloud_fields_partial") <- partial_fields
|
|
attr(summary_df, "cloud_fields_no_image") <- no_image_fields
|
|
attr(summary_df, "cloud_fields_total") <- total_fields
|
|
|
|
return(summary_df)
|
|
}
|
|
|
|
# ============================================================================
|
|
# EXPORT FUNCTIONS
|
|
# ============================================================================
|
|
|
|
export_field_analysis_excel <- function(field_df, summary_df, project_dir, current_week, reports_dir) {
|
|
message("Exporting per-field analysis to Excel and RDS...")
|
|
|
|
# Save to kpis/field_analysis subfolder
|
|
output_subdir <- file.path(reports_dir, "kpis", "field_analysis")
|
|
if (!dir.exists(output_subdir)) {
|
|
dir.create(output_subdir, recursive = TRUE)
|
|
}
|
|
|
|
# Create Excel with two sheets
|
|
excel_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", current_week), ".xlsx")
|
|
excel_path <- file.path(output_subdir, excel_filename)
|
|
excel_path <- normalizePath(excel_path, winslash = "\\", mustWork = FALSE)
|
|
|
|
sheets <- list(
|
|
"Field Data" = field_df,
|
|
"Summary" = summary_df
|
|
)
|
|
|
|
write_xlsx(sheets, excel_path)
|
|
message(paste("✓ Field analysis Excel exported to:", excel_path))
|
|
|
|
# Also save as RDS
|
|
kpi_data <- list(
|
|
field_analysis = field_df,
|
|
field_analysis_summary = summary_df,
|
|
metadata = list(
|
|
week = current_week,
|
|
export_date = Sys.Date()
|
|
)
|
|
)
|
|
|
|
rds_filename <- paste0(project_dir, "_kpi_summary_tables_week", sprintf("%02d", current_week), ".rds")
|
|
rds_path <- file.path(reports_dir, "kpis", rds_filename)
|
|
|
|
saveRDS(kpi_data, rds_path)
|
|
message(paste("✓ Field analysis RDS exported to:", rds_path))
|
|
|
|
# Also export as CSV for field history tracking
|
|
csv_filename <- paste0(project_dir, "_field_analysis_week", sprintf("%02d", current_week), ".csv")
|
|
csv_path <- file.path(output_subdir, csv_filename)
|
|
write_csv(field_df, csv_path)
|
|
message(paste("✓ Field analysis CSV exported to:", csv_path))
|
|
|
|
return(list(excel = excel_path, rds = rds_path, csv = csv_path))
|
|
}
|
|
|
|
# ============================================================================
|
|
# MAIN
|
|
# ============================================================================
|
|
|
|
main <- function() {
|
|
args <- commandArgs(trailingOnly = TRUE)
|
|
|
|
end_date <- if (length(args) >= 1 && !is.na(args[1])) {
|
|
as.Date(args[1])
|
|
} else if (exists("end_date_str", envir = .GlobalEnv)) {
|
|
as.Date(get("end_date_str", envir = .GlobalEnv))
|
|
} else {
|
|
Sys.Date()
|
|
}
|
|
|
|
project_dir <- if (length(args) >= 2 && !is.na(args[2])) {
|
|
as.character(args[2])
|
|
} else if (exists("project_dir", envir = .GlobalEnv)) {
|
|
get("project_dir", envir = .GlobalEnv)
|
|
} else {
|
|
"angata"
|
|
}
|
|
|
|
# IMPORTANT: Assign project_dir BEFORE sourcing parameters_project.R
|
|
# so that initialize_project() can access it via exists("project_dir")
|
|
assign("project_dir", project_dir, envir = .GlobalEnv)
|
|
|
|
# Load utilities and configuration (in this order - crop_messaging_utils before parameters)
|
|
source(here("r_app", "crop_messaging_utils.R"))
|
|
source(here("r_app", "parameters_project.R"))
|
|
|
|
message("=== FIELD ANALYSIS WEEKLY (TILE-AWARE, PARALLEL) ===")
|
|
message(paste("Date:", end_date))
|
|
message(paste("Project:", project_dir))
|
|
|
|
# Calculate weeks
|
|
current_week <- as.numeric(format(end_date, "%V"))
|
|
year <- as.numeric(format(end_date, "%Y"))
|
|
previous_week <- current_week - 1
|
|
if (previous_week < 1) previous_week <- 52
|
|
|
|
message(paste("Week:", current_week, "/ Year:", year))
|
|
|
|
# Build tile grid from available tiles
|
|
message("Building tile grid from available weekly tiles...")
|
|
tile_grid <- build_tile_grid(weekly_tile_max, current_week, year)
|
|
message(paste(" Found", nrow(tile_grid), "tiles for analysis"))
|
|
|
|
# Load field boundaries
|
|
tryCatch({
|
|
boundaries_result <- load_field_boundaries(data_dir)
|
|
|
|
# load_field_boundaries returns a list with field_boundaries_sf and field_boundaries
|
|
if (is.list(boundaries_result) && "field_boundaries_sf" %in% names(boundaries_result)) {
|
|
field_boundaries_sf <- boundaries_result$field_boundaries_sf
|
|
} else {
|
|
field_boundaries_sf <- boundaries_result
|
|
}
|
|
|
|
# Check if field_boundaries_sf is valid
|
|
if (!is.data.frame(field_boundaries_sf) && !inherits(field_boundaries_sf, "sf")) {
|
|
stop("Field boundaries is not a valid sf object or data frame")
|
|
}
|
|
|
|
if (nrow(field_boundaries_sf) == 0) {
|
|
stop("Field boundaries loaded but contains 0 rows")
|
|
}
|
|
|
|
message(paste(" Loaded", nrow(field_boundaries_sf), "fields from boundaries"))
|
|
}, error = function(e) {
|
|
stop("ERROR loading field boundaries: ", e$message,
|
|
"\nCheck that pivot.geojson exists in ", data_dir)
|
|
})
|
|
|
|
# Load previous week data for phase tracking
|
|
previous_week_result <- load_previous_week_csv(project_dir, current_week, reports_dir)
|
|
previous_week_csv <- if (previous_week_result$found) previous_week_result$data else NULL
|
|
|
|
# Load planting dates
|
|
planting_dates <- extract_planting_dates(harvesting_data)
|
|
|
|
# === PARALLEL PROCESSING SETUP ===
|
|
message("Setting up parallel processing...")
|
|
|
|
# Check if future is already planned
|
|
current_plan <- class(future::plan())[1]
|
|
if (current_plan == "sequential") {
|
|
# Default to multisession with auto-detected workers
|
|
num_workers <- parallel::detectCores() - 1
|
|
message(paste(" Using", num_workers, "workers for parallel processing"))
|
|
future::plan(future::multisession, workers = num_workers)
|
|
} else {
|
|
message(paste(" Using existing plan:", current_plan))
|
|
}
|
|
|
|
# === PARALLEL FIELD ANALYSIS ===
|
|
message("Analyzing fields in parallel...")
|
|
|
|
# Map over all fields using furrr (parallel version of map)
|
|
field_analysis_list <- furrr::future_map(
|
|
seq_len(nrow(field_boundaries_sf)),
|
|
~ analyze_single_field(
|
|
field_idx = .,
|
|
field_boundaries_sf = field_boundaries_sf,
|
|
tile_grid = tile_grid,
|
|
week_num = current_week,
|
|
year = year,
|
|
mosaic_dir = weekly_tile_max,
|
|
previous_week_csv = previous_week_csv,
|
|
planting_dates = planting_dates,
|
|
report_date = end_date,
|
|
harvest_imminence_data = NULL # Optional: add if available
|
|
),
|
|
.progress = TRUE,
|
|
.options = furrr::furrr_options(seed = TRUE)
|
|
)
|
|
|
|
# Bind list of data frames into single data frame
|
|
field_analysis_df <- dplyr::bind_rows(field_analysis_list)
|
|
|
|
if (nrow(field_analysis_df) == 0) {
|
|
stop("No fields analyzed successfully!")
|
|
}
|
|
|
|
message(paste("✓ Analyzed", nrow(field_analysis_df), "fields"))
|
|
|
|
# Generate summary
|
|
summary_statistics_df <- generate_field_analysis_summary(field_analysis_df)
|
|
|
|
# Export results
|
|
export_paths <- export_field_analysis_excel(
|
|
field_analysis_df,
|
|
summary_statistics_df,
|
|
project_dir,
|
|
current_week,
|
|
reports_dir
|
|
)
|
|
|
|
# Print summary
|
|
cat("\n=== FIELD ANALYSIS SUMMARY ===\n")
|
|
cat("Fields analyzed:", nrow(field_analysis_df), "\n")
|
|
cat("Excel export:", export_paths$excel, "\n")
|
|
cat("RDS export:", export_paths$rds, "\n")
|
|
cat("CSV export:", export_paths$csv, "\n\n")
|
|
|
|
cat("--- Per-field results (first 10) ---\n")
|
|
# Select only columns that exist to avoid print errors
|
|
available_cols <- c("Field_id", "Acreage", "Age_week", "CI_mean", "Status_trigger", "Cloud_category")
|
|
available_cols <- available_cols[available_cols %in% names(field_analysis_df)]
|
|
if (length(available_cols) > 0) {
|
|
print(head(field_analysis_df[, available_cols], 10))
|
|
} else {
|
|
print(head(field_analysis_df, 10))
|
|
}
|
|
|
|
cat("\n--- Summary statistics ---\n")
|
|
print(summary_statistics_df)
|
|
|
|
message("\n✓ Field analysis complete!")
|
|
}
|
|
|
|
if (sys.nframe() == 0) {
|
|
main()
|
|
}
|