SmartCane/r_app/20_ci_extraction_utils.R

1190 lines
44 KiB
R

# CI_EXTRACTION_UTILS.R
# =====================
# Utility functions for the SmartCane CI (Chlorophill Index) extraction workflow.
# These functions support date handling, raster processing, and data extraction.
# Includes parallel tile processing using furrr for memory efficiency.
#
# Parallel Processing: Tile-based extraction uses furrr::future_map to process
# multiple tiles simultaneously (typically 2-4 tiles in parallel depending on CPU cores)
#
# Per-Field Functions (Script 20):
# - calc_ci_from_raster(): Calculate CI from 4-band raster (Chlorophyll Index formula: NIR/Green - 1)
# - extract_ci_by_subfield(): Extract per-sub_field CI statistics from raster
#' Generate a sequence of dates for processing
#'
#' @param end_date The end date for the sequence (Date object)
#' @param offset Number of days to look back from end_date
#' @return A list containing week number, year, and a sequence of dates for filtering
#'
date_list <- function(end_date, offset) {
# Input validation
if (!lubridate::is.Date(end_date)) {
end_date <- as.Date(end_date)
if (is.na(end_date)) {
stop("Invalid end_date provided. Expected a Date object or a string convertible to Date.")
}
}
offset <- as.numeric(offset)
if (is.na(offset) || offset < 1) {
stop("Invalid offset provided. Expected a positive number.")
}
# Calculate date range
offset <- offset - 1 # Adjust offset to include end_date
start_date <- end_date - lubridate::days(offset)
# Extract week and year information
week <- lubridate::week(start_date)
year <- lubridate::year(start_date)
# Generate sequence of dates
days_filter <- seq(from = start_date, to = end_date, by = "day")
days_filter <- format(days_filter, "%Y-%m-%d") # Format for consistent filtering
# Log the date range
safe_log(paste("Date range generated from", start_date, "to", end_date))
return(list(
"week" = week,
"year" = year,
"days_filter" = days_filter,
"start_date" = start_date,
"end_date" = end_date
))
}
#' Detect band count and structure (4-band vs 8-band with optional UDM)
#'
#' @param loaded_raster Loaded raster object
#' @return List with structure info: $type (4b or 8b), $has_udm (logical), $band_names
#'
detect_raster_structure <- function(loaded_raster) {
n_bands <- terra::nlyr(loaded_raster)
# Determine raster type and structure
if (n_bands == 4) {
# 4-band optimized: RGB + NIR (cloud-masked server-side by Planet)
return(list(
type = "4b",
has_udm = FALSE,
band_names = c("Red", "Green", "Blue", "NIR"),
red_idx = 1, green_idx = 2, blue_idx = 3, nir_idx = 4, udm_idx = NA
))
} else if (n_bands == 5) {
# 4-band + alpha channel (may be added by GDAL merge process)
# Alpha channel is ignored, we use first 4 bands
return(list(
type = "4b",
has_udm = FALSE,
band_names = c("Red", "Green", "Blue", "NIR", "Alpha"),
red_idx = 1, green_idx = 2, blue_idx = 3, nir_idx = 4, udm_idx = NA
))
} else if (n_bands %in% c(8, 9)) {
# PlanetScope 8-band structure:
# 1=Coastal Blue, 2=Blue, 3=Green I, 4=Green, 5=Yellow, 6=Red, 7=Red Edge, 8=NIR
# 9-band: includes UDM1 (Usable Data Mask) as final band
has_udm <- n_bands == 9
return(list(
type = "8b",
has_udm = has_udm,
band_names = if (has_udm) {
c("CoastalBlue", "Blue", "GreenI", "Green", "Yellow", "Red", "RedEdge", "NIR", "UDM1")
} else {
c("CoastalBlue", "Blue", "GreenI", "Green", "Yellow", "Red", "RedEdge", "NIR")
},
red_idx = 6, green_idx = 4, blue_idx = 2, nir_idx = 8, udm_idx = if (has_udm) 9 else NA
))
} else {
stop(paste("Unexpected number of bands:", n_bands,
"Expected 4-band, 4-band+alpha, 8-band, or 9-band data"))
}
}
#' Apply cloud masking for 8-band data with UDM layer
#'
#' @param loaded_raster Raster with UDM band
#' @param udm_idx Index of the UDM band
#' @return Raster with cloud-masked pixels set to NA
#'
apply_udm_masking <- function(loaded_raster, udm_idx) {
if (is.na(udm_idx)) {
return(loaded_raster)
}
# Extract UDM band (0 = clear sky, 1 = shadow, 2 = cloud, 3 = snow, 4 = water)
# We only mask pixels where UDM = 2 (clouds)
udm_band <- loaded_raster[[udm_idx]]
# Create mask where UDM == 0 (clear/valid pixels only)
cloud_mask <- udm_band == 0
# Apply mask to all bands except UDM itself
for (i in 1:(terra::nlyr(loaded_raster) - 1)) {
loaded_raster[[i]][!cloud_mask] <- NA
}
safe_log(paste("Applied UDM cloud masking to raster (masking non-clear pixels)"))
return(loaded_raster)
}
#' Create a Chlorophill Index (CI) mask from satellite imagery and crop to field boundaries
#'
#' Supports both 4-band and 8-band (with optional UDM) Planet Scope data.
#' For 8-band data, automatically applies cloud masking using the UDM layer if present.
#'
#' @param file Path to the satellite image file
#' @param field_boundaries Field boundaries vector object
#' @param merged_final_dir Directory to save the processed raster
#' @return Processed raster object with CI band
#'
create_mask_and_crop <- function(file, field_boundaries, merged_final_dir) {
# Validate inputs
if (!file.exists(file)) {
stop(paste("File not found:", file))
}
if (is.null(field_boundaries)) {
stop("Field boundaries are required but were not provided")
}
# Note: No conversion needed - sf::st_bbox() works with both sf and terra objects
# field_boundaries is used only for spatial reference (bounding box)
# Establish file names for output
basename_no_ext <- tools::file_path_sans_ext(basename(file))
new_file <- here::here(merged_final_dir, paste0(basename_no_ext, ".tif"))
vrt_file <- here::here(daily_vrt, paste0(basename_no_ext, ".vrt"))
# Process with error handling
tryCatch({
# Log processing start
safe_log(paste("Processing", basename(file)))
# Load and prepare raster
loaded_raster <- terra::rast(file)
# Validate raster has necessary bands
if (terra::nlyr(loaded_raster) < 4) {
stop("Raster must have at least 4 bands (Red, Green, Blue, NIR)")
}
# Detect raster structure (4b vs 8b with optional UDM)
structure_info <- detect_raster_structure(loaded_raster)
safe_log(paste("Detected", structure_info$type, "data",
if (structure_info$has_udm) "with UDM cloud masking" else "without cloud masking"))
# Extract the bands we need FIRST
# This ensures we're working with just the necessary data
red_band <- loaded_raster[[structure_info$red_idx]]
green_band <- loaded_raster[[structure_info$green_idx]]
blue_band <- loaded_raster[[structure_info$blue_idx]]
nir_band <- loaded_raster[[structure_info$nir_idx]]
# Now apply cloud masking to these selected bands if UDM exists
if (structure_info$has_udm) {
udm_band <- loaded_raster[[structure_info$udm_idx]]
# Create mask where UDM != 2 (mask only clouds, keep clear sky and shadows)
cloud_mask <- udm_band != 2
red_band[!cloud_mask] <- NA
green_band[!cloud_mask] <- NA
blue_band[!cloud_mask] <- NA
nir_band[!cloud_mask] <- NA
safe_log("Applied UDM cloud masking to selected bands (masking UDM=2 clouds only)")
}
# Name the bands
names(red_band) <- "Red"
names(green_band) <- "Green"
names(blue_band) <- "Blue"
names(nir_band) <- "NIR"
# Calculate Canopy Index from Green and NIR
# *** CRITICAL: Use CHLOROPHYLL INDEX formula ONLY ***
# CORRECT: CI = NIR / Green - 1 (ranges ~1-7, sensitive to active chlorophyll)
# WRONG: Do NOT use (NIR-Red)/(NIR+Red) - that is NDVI, ranges -1 to 1, different scale
CI <- nir_band / green_band - 1
names(CI) <- "CI"
# Create output raster with essential bands: Red, Green, Blue, NIR, CI
output_raster <- c(red_band, green_band, blue_band, nir_band, CI)
names(output_raster) <- c("Red", "Green", "Blue", "NIR", "CI")
# Ensure CRS compatibility before cropping
tryCatch({
raster_crs <- terra::crs(output_raster, proj = TRUE)
raster_crs_char <- as.character(raster_crs)
# Handle boundaries CRS - works for both terra and sf objects
if (inherits(field_boundaries, "sf")) {
boundaries_crs <- sf::st_crs(field_boundaries)
boundaries_crs_char <- if (!is.na(boundaries_crs)) as.character(boundaries_crs$wkt) else ""
} else {
boundaries_crs <- terra::crs(field_boundaries, proj = TRUE)
boundaries_crs_char <- as.character(boundaries_crs)
}
if (length(raster_crs_char) > 0 && length(boundaries_crs_char) > 0 &&
nchar(raster_crs_char) > 0 && nchar(boundaries_crs_char) > 0) {
if (raster_crs_char != boundaries_crs_char) {
# Transform field boundaries to match raster CRS only if it's a terra object
if (inherits(field_boundaries, "SpatVector")) {
field_boundaries <- terra::project(field_boundaries, raster_crs_char)
safe_log("Transformed field boundaries CRS to match raster CRS")
} else {
safe_log("Field boundaries is sf object - CRS transformation skipped")
}
}
} else {
# If CRS is missing, try to assign a default WGS84 CRS
if (length(raster_crs_char) == 0 || nchar(raster_crs_char) == 0) {
terra::crs(output_raster) <- "EPSG:4326"
safe_log("Assigned default WGS84 CRS to raster")
}
if (length(boundaries_crs_char) == 0 || nchar(boundaries_crs_char) == 0) {
if (inherits(field_boundaries, "SpatVector")) {
terra::crs(field_boundaries) <- "EPSG:4326"
} else {
sf::st_crs(field_boundaries) <- 4326
}
safe_log("Assigned default WGS84 CRS to field boundaries")
}
}
}, error = function(e) {
safe_log(paste("CRS handling warning:", e$message), "WARNING")
})
output_raster <- tryCatch({
# terra::crop can work with both terra and sf objects, but if it fails with sf, try conversion
terra::crop(output_raster, field_boundaries, mask = TRUE)
}, error = function(e) {
# If crop fails (common with certain sf geometries), convert sf to terra first
if (inherits(field_boundaries, "sf")) {
safe_log(paste("Crop with sf failed, attempting alternative approach:", e$message), "WARNING")
# Use terra mask operation instead of crop for sf objects
# First, get the bbox from sf object and use it for rough crop
bbox <- sf::st_bbox(field_boundaries)
output_raster_cropped <- terra::crop(output_raster,
terra::ext(bbox[1], bbox[3], bbox[2], bbox[4]))
return(output_raster_cropped)
} else {
stop(e)
}
})
# Note: Do NOT replace zeros with NA here - Red/Green/Blue/NIR reflectance can be near zero
# Only CI can go negative (if NIR < Green), but that's valid vegetation index behavior
# output_raster[output_raster == 0] <- NA # REMOVED - this was causing data loss
# Write output files
terra::writeRaster(output_raster, new_file, overwrite = TRUE)
# Check if the result has enough valid pixels
valid_pixels <- terra::global(output_raster$CI, "notNA", na.rm=TRUE)
# Log completion
safe_log(paste("Completed processing", basename(file),
"- Valid pixels:", valid_pixels[1,]))
return(output_raster)
}, error = function(e) {
err_msg <- paste("Error processing", basename(file), "-", e$message)
safe_log(err_msg, "ERROR")
return(NULL)
}, finally = {
# Clean up memory
gc()
})
}
#' Process a batch of satellite images and create VRT files
#'
#' @param files Vector of file paths to process
#' @param field_boundaries Field boundaries vector object for cropping
#' @param merged_final_dir Directory to save processed rasters
#' @param daily_vrt_dir Directory to save VRT files
#' @param min_valid_pixels Minimum number of valid pixels for a raster to be kept (default: 100)
#' @return List of valid VRT files created
#'
process_satellite_images <- function(files, field_boundaries, merged_final_dir, daily_vrt_dir, min_valid_pixels = 100) {
vrt_list <- list()
safe_log(paste("Starting batch processing of", length(files), "files"))
# Process each file
for (file in files) {
# Process each raster file
v_crop <- create_mask_and_crop(file, field_boundaries, merged_final_dir)
# Skip if processing failed
if (is.null(v_crop)) {
next
}
# Check if the raster has enough valid data
valid_data <- terra::global(v_crop, "notNA")
vrt_file <- here::here(daily_vrt_dir, paste0(tools::file_path_sans_ext(basename(file)), ".vrt"))
if (valid_data[1,] > min_valid_pixels) {
vrt_list[[vrt_file]] <- vrt_file
} else {
# Remove VRT files with insufficient data
if (file.exists(vrt_file)) {
file.remove(vrt_file)
}
safe_log(paste("Skipping", basename(file), "- insufficient valid data"), "WARNING")
}
# Clean up memory
rm(v_crop)
gc()
}
safe_log(paste("Completed processing", length(vrt_list), "raster files"))
return(vrt_list)
}
#' Find satellite image files filtered by date
#'
#' @param tif_folder Directory containing satellite imagery files
#' @param dates_filter Character vector of dates in YYYY-MM-DD format
#' @return Vector of file paths matching the date filter
#'
find_satellite_images <- function(tif_folder, dates_filter) {
# Find all raster files
raster_files <- list.files(tif_folder, full.names = TRUE, pattern = "\\.tif$")
if (length(raster_files) == 0) {
stop("No raster files found in directory: ", tif_folder)
}
# Filter files by dates
filtered_files <- purrr::map(dates_filter, ~ raster_files[grepl(pattern = .x, x = raster_files)]) %>%
purrr::compact() %>%
purrr::flatten_chr()
# Remove files that do not exist
existing_files <- filtered_files[file.exists(filtered_files)]
# Check if the list of existing files is empty
if (length(existing_files) == 0) {
stop("No files found matching the date filter: ", paste(dates_filter, collapse = ", "))
}
return(existing_files)
}
#' Extract date from file path
#'
#' @param file_path Path to the file
#' @return Extracted date in YYYY-MM-DD format
#'
date_extract <- function(file_path) {
date <- stringr::str_extract(file_path, "\\d{4}-\\d{2}-\\d{2}")
if (is.na(date)) {
warning(paste("Could not extract date from file path: ", file_path))
}
return(date)
}
#' Extract CI values from a raster for each field or subfield
#'
#' @param file Path to the raster file
#' @param field_geojson Field boundaries as SF object
#' @param quadrants Boolean indicating whether to extract by quadrants
#' @param save_dir Directory to save the extracted values
#' @return Path to the saved RDS file
#'
extract_rasters_daily <- function(file, field_geojson, quadrants = TRUE, save_dir) {
# Validate inputs
if (!file.exists(file)) {
stop(paste("File not found: ", file))
}
if (!inherits(field_geojson, "sf") && !inherits(field_geojson, "sfc")) {
field_geojson <- sf::st_as_sf(field_geojson)
}
# Extract date from file path
date <- date_extract(file)
if (is.na(date)) {
stop(paste("Could not extract date from file path:", file))
}
# Log extraction start
safe_log(paste("Extracting CI values for", date, "- Using quadrants:", quadrants))
# Process with error handling
tryCatch({
# Load raster
x <- terra::rast(file)
# Check if CI band exists
if (!"CI" %in% names(x)) {
stop("CI band not found in raster")
}
# Get raster info for logging (dimensions and memory scale)
raster_cells <- terra::ncell(x$CI)
raster_size_mb <- (raster_cells * 8) / (1024 * 1024) # approximate MB for double precision
safe_log(paste(" Raster size:", format(raster_cells, big.mark=","), "cells (~",
round(raster_size_mb, 1), "MB)"))
# Crop raster to field boundaries extent BEFORE extraction
# This reduces memory usage by working with a smaller spatial subset
field_bbox <- sf::st_bbox(field_geojson)
x_cropped <- terra::crop(x, terra::ext(field_bbox), snap = "out")
cropped_cells <- terra::ncell(x_cropped$CI)
cropped_mb <- (cropped_cells * 8) / (1024 * 1024)
safe_log(paste(" After crop:", format(cropped_cells, big.mark=","),
"cells (~", round(cropped_mb, 1), "MB)"))
# Extract statistics using terra::extract (memory-efficient, works with sf directly)
# terra::extract returns a data.frame with ID (row numbers) and extracted values
extracted_vals <- terra::extract(x_cropped$CI, field_geojson, fun = "mean", na.rm = TRUE)
# Build result matching expected format (field, sub_field, date columns)
pivot_stats <- field_geojson %>%
sf::st_drop_geometry() %>%
mutate(mean_CI = round(extracted_vals[, 2], 2)) %>%
dplyr::rename("{date}" := mean_CI)
# Determine save path
save_suffix <- if (quadrants) {"quadrant"} else {"whole_field"}
save_path <- here::here(save_dir, paste0("extracted_", date, "_", save_suffix, ".rds"))
# Save extracted data
saveRDS(pivot_stats, save_path)
# Log success
safe_log(paste("Successfully extracted and saved CI values for", date))
return(save_path)
}, error = function(e) {
err_msg <- paste("Error extracting CI values for", date, "-", e$message)
safe_log(err_msg, "ERROR")
return(NULL)
})
}
#' Combine daily CI values into a single dataset
#'
#' @param daily_CI_vals_dir Directory containing daily CI values
#' @param output_file Path to save the combined dataset
#' @return Combined dataset as a tibble
#'
combine_ci_values <- function(daily_CI_vals_dir, output_file = NULL) {
# List all RDS files in the daily CI values directory
files <- list.files(path = daily_CI_vals_dir, pattern = "^extracted_.*\\.rds$", full.names = TRUE)
if (length(files) == 0) {
stop("No extracted CI values found in directory:", daily_CI_vals_dir)
}
# Log process start
safe_log(paste("Combining", length(files), "CI value files"))
# Load and combine all files
combined_data <- files %>%
purrr::map(readRDS) %>%
purrr::list_rbind() %>%
dplyr::group_by(sub_field)
# Save if output file is specified
if (!is.null(output_file)) {
saveRDS(combined_data, output_file)
safe_log(paste("Combined CI values saved to", output_file))
}
return(combined_data)
}
#' Update existing CI data with new values
#'
#' @param new_data New CI data to be added
#' @param existing_data_file Path to the existing data file
#' @return Updated combined dataset
#'
update_ci_data <- function(new_data, existing_data_file) {
if (!file.exists(existing_data_file)) {
# File doesn't exist - create it with new data
safe_log(paste("Creating new CI data file:", existing_data_file))
# Ensure directory exists
dir.create(dirname(existing_data_file), recursive = TRUE, showWarnings = FALSE)
# Save new data
saveRDS(new_data, existing_data_file)
safe_log(paste("New CI data file created:", existing_data_file))
return(new_data)
}
# File exists - load existing data and combine
existing_data <- readRDS(existing_data_file)
# Combine data, handling duplicates by keeping the newer values
combined_data <- dplyr::bind_rows(new_data, existing_data) %>%
dplyr::distinct() %>%
dplyr::group_by(sub_field)
# Save updated data
saveRDS(combined_data, existing_data_file)
safe_log(paste("Updated CI data saved to", existing_data_file))
return(combined_data)
}
#' Process and combine CI values from raster files
#'
#' @param dates List of dates from date_list()
#' @param field_boundaries Field boundaries as vector object
#' @param merged_final_dir Directory with processed raster files
#' @param field_boundaries_sf Field boundaries as SF object
#' @param daily_CI_vals_dir Directory to save daily CI values
#' @param cumulative_CI_vals_dir Directory to save cumulative CI values
#' @return NULL (used for side effects)
#'
process_ci_values <- function(dates, field_boundaries, merged_final_dir,
field_boundaries_sf, daily_CI_vals_dir,
cumulative_CI_vals_dir) {
# Find processed raster files
raster_files <- list.files(merged_final_dir, full.names = TRUE, pattern = "\\.tif$")
# Define path for combined CI data
combined_ci_path <- here::here(cumulative_CI_vals_dir, "combined_CI_data.rds")
# Check if the combined CI data file exists
if (!file.exists(combined_ci_path)) {
# Process all available data if file doesn't exist
safe_log("combined_CI_data.rds does not exist. Creating new file with all available data.")
safe_log(paste("Processing", length(raster_files), "raster files"))
# Extract data from all raster files with error handling
tryCatch({
purrr::walk(
raster_files,
extract_rasters_daily,
field_geojson = field_boundaries_sf,
quadrants = FALSE,
save_dir = daily_CI_vals_dir
)
safe_log("Extraction complete for all raster files")
}, error = function(e) {
safe_log(paste("Error during extraction walk:", e$message), "ERROR")
})
# Combine all extracted data
tryCatch({
pivot_stats <- combine_ci_values(daily_CI_vals_dir, combined_ci_path)
safe_log("All CI values extracted from historic images and saved.")
}, error = function(e) {
safe_log(paste("Error combining CI values:", e$message), "ERROR")
stop(e$message)
})
} else {
# Process only the latest data and add to existing file
safe_log("combined_CI_data.rds exists, adding the latest image data.")
# Filter files by dates
filtered_files <- purrr::map(dates$days_filter, ~ raster_files[grepl(pattern = .x, x = raster_files)]) %>%
purrr::compact() %>%
purrr::flatten_chr()
safe_log(paste("Processing", length(filtered_files), "new raster files"))
# Extract data for the new files with error handling
tryCatch({
purrr::walk(
filtered_files,
extract_rasters_daily,
field_geojson = field_boundaries_sf,
quadrants = TRUE,
save_dir = daily_CI_vals_dir
)
safe_log("Extraction complete for new files")
}, error = function(e) {
safe_log(paste("Error during extraction walk:", e$message), "ERROR")
})
# Filter extracted values files by the current date range
extracted_values <- list.files(daily_CI_vals_dir, full.names = TRUE)
extracted_values <- purrr::map(dates$days_filter, ~ extracted_values[grepl(pattern = .x, x = extracted_values)]) %>%
purrr::compact() %>%
purrr::flatten_chr()
safe_log(paste("Found", length(extracted_values), "extracted value files to combine"))
# Combine new values
new_pivot_stats <- extracted_values %>%
purrr::map(readRDS) %>%
purrr::list_rbind() %>%
dplyr::group_by(sub_field)
# Update the combined data file
update_ci_data(new_data = new_pivot_stats, existing_data_file =combined_ci_path)
safe_log("CI values from latest images added to combined_CI_data.rds")
}
}
#' Process CI values from pre-split tiles (Script 01 output)
#'
#' This function processes CI values from tiles instead of full-extent rasters.
#' Tiles are created by Script 01 and stored in daily_tiles_split/[GRID_SIZE]/[DATE]/ folders.
#' For each field, it aggregates CI statistics from all tiles that intersect that field.
#' Output follows the same grid structure: merged_final_tif/[GRID_SIZE]/[DATE]/
#'
#' NOTE: Processes dates SEQUENTIALLY but tiles WITHIN EACH DATE in parallel (furrr)
#' This avoids worker process communication issues while still getting good speedup.
#'
#' @param dates List of dates from date_list()
#' @param tile_folder Path to the tile folder (daily_tiles_split/[GRID_SIZE])
#' @param field_boundaries Field boundaries as vector object
#' @param field_boundaries_sf Field boundaries as SF object
#' @param daily_CI_vals_dir Directory to save daily CI values
#' @param cumulative_CI_vals_dir Directory to save cumulative CI values
#' @param merged_final_dir Base directory to save processed tiles with CI band
#' @param grid_size Grid size label (e.g., "5x5", "10x10") for output path structure
#' @return NULL (used for side effects)
#'
process_ci_values_from_tiles <- function(dates, tile_folder, field_boundaries,
field_boundaries_sf, daily_CI_vals_dir,
cumulative_CI_vals_dir, merged_final_dir,
grid_size = NA) {
# Define path for combined CI data
combined_ci_path <- here::here(cumulative_CI_vals_dir, "combined_CI_data.rds")
# Discover all dates with tiles
tile_dates <- list.dirs(tile_folder, full.names = FALSE, recursive = FALSE)
tile_dates <- tile_dates[tile_dates != "master_grid_5x5.geojson"] # Remove non-date entries
tile_dates <- sort(tile_dates)
# Filter to dates in current processing range
dates_to_process <- tile_dates[tile_dates %in% dates$days_filter]
if (length(dates_to_process) == 0) {
safe_log("No tile dates found in processing date range", "WARNING")
return(invisible(NULL))
}
safe_log(paste("Found", length(dates_to_process), "date(s) with tiles"))
# Check if the combined CI data file exists
if (!file.exists(combined_ci_path)) {
safe_log("combined_CI_data.rds does not exist. Creating new file with all available tile data.")
# Process all tile dates SEQUENTIALLY but with parallel tile processing
# Tiles within each date are processed in parallel via extract_ci_from_tiles()
all_pivot_stats <- list()
for (i in seq_along(tile_dates)) {
date <- tile_dates[i]
# SKIP: Check if this date already has processed output tiles
if (!is.na(grid_size)) {
output_date_folder <- file.path(merged_final_dir, grid_size, date)
} else {
output_date_folder <- file.path(merged_final_dir, date)
}
if (dir.exists(output_date_folder)) {
existing_tiles <- list.files(output_date_folder, pattern = "\\.tif$")
if (length(existing_tiles) > 0) {
safe_log(paste("[", i, "/", length(tile_dates), "] SKIP:", date, "- already has", length(existing_tiles), "tiles"))
next
}
}
safe_log(paste("[", i, "/", length(tile_dates), "] Processing tiles for date:", date))
date_tile_dir <- file.path(tile_folder, date)
tile_files <- list.files(date_tile_dir, pattern = "\\.tif$", full.names = TRUE)
if (length(tile_files) == 0) {
safe_log(paste(" No tile files found for", date), "WARNING")
next
}
safe_log(paste(" Found", length(tile_files), "tiles - processing in parallel"))
# Process all tiles for this date and aggregate to fields
# Tiles are processed in parallel via furrr::future_map() inside extract_ci_from_tiles()
date_stats <- extract_ci_from_tiles(
tile_files = tile_files,
date = date,
field_boundaries_sf = field_boundaries_sf,
daily_CI_vals_dir = daily_CI_vals_dir,
grid_size = grid_size
)
if (!is.null(date_stats)) {
all_pivot_stats[[date]] <- date_stats
}
}
# Combine all dates
if (length(all_pivot_stats) > 0) {
# Use bind_rows() to handle varying column names across dates gracefully
combined_stats <- dplyr::bind_rows(all_pivot_stats, .id = NULL)
rownames(combined_stats) <- NULL
# Save combined data
saveRDS(combined_stats, combined_ci_path)
safe_log("All tile CI values extracted and combined_CI_data.rds created")
} else {
safe_log("No tile data was processed", "WARNING")
}
} else {
# Process only new dates SEQUENTIALLY but with parallel tile processing
safe_log("combined_CI_data.rds exists, adding new tile data.")
if (length(dates_to_process) == 0) {
safe_log("No new dates to process", "WARNING")
return(invisible(NULL))
}
safe_log(paste("Processing", length(dates_to_process), "new dates..."))
new_pivot_stats_list <- list()
for (i in seq_along(dates_to_process)) {
date <- dates_to_process[i]
# SKIP: Check if this date already has processed output tiles
if (!is.na(grid_size)) {
output_date_folder <- file.path(merged_final_dir, grid_size, date)
} else {
output_date_folder <- file.path(merged_final_dir, date)
}
if (dir.exists(output_date_folder)) {
existing_tiles <- list.files(output_date_folder, pattern = "\\.tif$")
if (length(existing_tiles) > 0) {
safe_log(paste("[", i, "/", length(dates_to_process), "] SKIP:", date, "- already has", length(existing_tiles), "tiles"))
next
}
}
safe_log(paste("[", i, "/", length(dates_to_process), "] Processing tiles for date:", date))
date_tile_dir <- file.path(tile_folder, date)
tile_files <- list.files(date_tile_dir, pattern = "\\.tif$", full.names = TRUE)
if (length(tile_files) == 0) {
safe_log(paste(" No tile files found for", date), "WARNING")
next
}
safe_log(paste(" Found", length(tile_files), "tiles - processing in parallel"))
# Extract CI from tiles for this date
date_stats <- extract_ci_from_tiles(
tile_files = tile_files,
date = date,
field_boundaries_sf = field_boundaries_sf,
daily_CI_vals_dir = daily_CI_vals_dir,
grid_size = grid_size
)
if (!is.null(date_stats)) {
new_pivot_stats_list[[date]] <- date_stats
}
}
# Combine new data
if (length(new_pivot_stats_list) > 0) {
# Use bind_rows() to handle varying column names across dates gracefully
new_pivot_stats <- dplyr::bind_rows(new_pivot_stats_list, .id = NULL)
rownames(new_pivot_stats) <- NULL
# Update combined file
update_ci_data(new_pivot_stats, combined_ci_path)
safe_log("Tile CI values from new dates added to combined_CI_data.rds")
} else {
safe_log("No new tile dates had valid data", "WARNING")
}
}
}
#' Process a single tile file, extract CI, save processed tile, and extract statistics
#'
#' Helper function for parallel processing of tiles. For each tile:
#' 1. Loads tile
#' 2. Creates/extracts CI band
#' 3. Creates output raster with Red, Green, Blue, NIR, CI bands
#' 4. Saves to merged_final_tif_dir/[GRID_SIZE]/[DATE]/ mirroring daily_tiles_split structure
#' 5. Extracts field-level CI statistics
#' Returns statistics aggregated to field level.
#'
#' @param tile_file Path to a single tile TIF file
#' @param field_boundaries_sf Field boundaries as SF object
#' @param date Character string of the date (YYYY-MM-DD format)
#' @param merged_final_tif_dir Base directory to save processed tiles with CI band
#' @param grid_size Grid size label (e.g., "5x5", "10x10") for output path structure
#' @return Data frame with field CI statistics for this tile, or NULL if processing failed
#'
process_single_tile <- function(tile_file, field_boundaries_sf, date, grid_size = NA) {
tryCatch({
tile_filename <- basename(tile_file)
safe_log(paste(" [TILE] Loading:", tile_filename))
# Load tile
tile_rast <- terra::rast(tile_file)
# Determine if this is 4-band or 8-band data
raster_info <- detect_raster_structure(tile_rast)
# Extract the bands we need
red_band <- tile_rast[[raster_info$red_idx]]
green_band <- tile_rast[[raster_info$green_idx]]
blue_band <- tile_rast[[raster_info$blue_idx]]
nir_band <- tile_rast[[raster_info$nir_idx]]
# Apply cloud masking if UDM exists
if (raster_info$has_udm) {
udm_band <- tile_rast[[raster_info$udm_idx]]
cloud_mask <- udm_band != 2 # Mask only UDM=2 (clouds)
red_band[!cloud_mask] <- NA
green_band[!cloud_mask] <- NA
blue_band[!cloud_mask] <- NA
nir_band[!cloud_mask] <- NA
}
# Name the bands
names(red_band) <- "Red"
names(green_band) <- "Green"
names(blue_band) <- "Blue"
names(nir_band) <- "NIR"
# Create CI band inline: NIR/Green - 1
ci_band <- nir_band / green_band - 1
names(ci_band) <- "CI"
# Create output raster with Red, Green, Blue, NIR, CI
output_raster <- c(red_band, green_band, blue_band, nir_band, ci_band)
names(output_raster) <- c("Red", "Green", "Blue", "NIR", "CI")
# NOTE: Do NOT save processed tile - it's an intermediate only
# The purpose is to calculate field-level CI statistics, not to create permanent tile files
# This prevents bloat in merged_final_tif/ directory (would unnecessarily duplicate
# daily_tiles_split data with an extra CI band added)
# Extract statistics per field from CI band
field_bbox <- sf::st_bbox(field_boundaries_sf)
ci_cropped <- terra::crop(ci_band, terra::ext(field_bbox), snap = "out")
extracted_vals <- terra::extract(ci_cropped, field_boundaries_sf, fun = "mean", na.rm = TRUE)
# Build statistics data frame for this tile
tile_stats <- field_boundaries_sf %>%
sf::st_drop_geometry() %>%
mutate(mean_CI = round(extracted_vals[, 2], 2)) %>%
mutate(tile_file = basename(tile_file))
return(tile_stats)
}, error = function(e) {
safe_log(paste(" Error processing tile", basename(tile_file), "-", e$message), "WARNING")
return(NULL)
})
}
#' Extract CI values from multiple tiles and aggregate to fields
#'
#' Given a set of tile files for a single date, this function:
#' 1. Loads each tile IN PARALLEL using furrr
#' 2. Creates/extracts CI band
#' 3. Saves processed tile (Red, Green, Blue, NIR, CI) to merged_final_tif_dir/[GRID_SIZE]/[DATE]/
#' 4. Calculates field statistics from CI band
#' 5. Aggregates field statistics across tiles
#' 6. Saves individual date file (matching legacy workflow)
#'
#' Parallel processing: Uses future_map to process 2-4 tiles simultaneously depending on available cores.
#'
#' @param tile_files Character vector of full paths to tile TIF files
#' @param date Character string of the date (YYYY-MM-DD format)
#' @param field_boundaries_sf Field boundaries as SF object
#' @param daily_CI_vals_dir Directory to save individual date RDS files
#' @param merged_final_tif_dir Base directory to save processed tiles with CI band
#' @param grid_size Grid size label (e.g., "5x5", "10x10") for output path structure
#' @return Data frame with field CI statistics for the date
#'
extract_ci_from_tiles <- function(tile_files, date, field_boundaries_sf, daily_CI_vals_dir = NULL, grid_size = NA) {
if (!inherits(field_boundaries_sf, "sf")) {
field_boundaries_sf <- sf::st_as_sf(field_boundaries_sf)
}
safe_log(paste(" Processing", length(tile_files), "tiles for date", date, "(3-tile parallel batch)"))
# Windows-compatible parallelization: Process tiles in small batches
# Use future_map with 3 workers - stable and efficient on Windows
# Set up minimal future plan (3 workers max)
future::plan(future::multisession, workers = 3)
# Process tiles using furrr with 2 workers
# Use retry logic for worker stability
stats_list <- tryCatch({
furrr::future_map(
tile_files,
~ process_single_tile(.x, field_boundaries_sf, date, grid_size = grid_size),
.progress = FALSE,
.options = furrr::furrr_options(seed = TRUE)
)
}, error = function(e) {
safe_log(paste("Parallel processing failed:", e$message, "- falling back to sequential"), "WARNING")
# Fallback to sequential if parallel fails
lapply(
tile_files,
function(tile_file) {
process_single_tile(tile_file, field_boundaries_sf, date, grid_size = grid_size)
}
)
})
# Extract names and filter out NULL results (failed tiles)
tile_names <- basename(tile_files)
all_stats <- stats_list[!sapply(stats_list, is.null)]
names(all_stats) <- tile_names[!sapply(stats_list, is.null)]
if (length(all_stats) == 0) {
return(NULL)
}
# Combine all tiles and aggregate to field level
# Use dplyr::bind_rows() to handle column name inconsistencies gracefully
combined_tiles <- dplyr::bind_rows(all_stats)
rownames(combined_tiles) <- NULL
# Aggregate: For each field, compute mean CI across all tiles
aggregated <- combined_tiles %>%
group_by(across(-c(mean_CI, tile_file))) %>%
summarise(!!date := round(mean(mean_CI, na.rm = TRUE), 2), .groups = "drop")
# Save individual date file (matching legacy workflow format: extracted_YYYY-MM-DD_quadrant.rds)
if (!is.null(daily_CI_vals_dir)) {
save_path <- file.path(daily_CI_vals_dir, paste0("extracted_", date, "_quadrant.rds"))
saveRDS(aggregated, save_path)
safe_log(paste("[RDS SAVED] Date:", date, "-> File: extracted_", date, "_quadrant.rds"))
}
return(aggregated)
}
# =============================================================================
# Script 20 (Per-Field) Specific Functions
# =============================================================================
#' Calculate Canopy Index (CI) from 4-band raster
#'
#' *** CRITICAL FORMULA: CI = NIR / Green - 1 ***
#' This is the CHLOROPHYLL INDEX formula (ranges ~1-7 for vegetation).
#' NOT NDVI! Do NOT use (NIR-Red)/(NIR+Red) - that produces -1 to 1 range.
#'
#' Expects band order: Red (band 1), Green (band 2), Blue (band 3), NIR (band 4)
#'
#' @param raster_obj Loaded raster object with at least 4 bands
#' @return Raster object containing CI values (Chlorophyll Index, ranges ~1-7)
#'
calc_ci_from_raster <- function(raster_obj) {
# Expected band order: Red (band 1), Green (band 2), Blue (band 3), NIR (band 4)
if (terra::nlyr(raster_obj) < 4) {
stop("Raster has fewer than 4 bands. Cannot calculate CI.")
}
green <- terra::subset(raster_obj, 2) # Green band (required for proper CI calculation)
nir <- terra::subset(raster_obj, 4) # NIR
# *** CHLOROPHYLL INDEX = NIR / Green - 1 ***
# This formula is sensitive to active chlorophyll content and ranges ~1-7
# DO NOT use (NIR-Red)/(NIR+Red) - that is NDVI (Normalized Difference Vegetation Index)
# NDVI ranges -1 to 1 and is different from Chlorophyll Index
ci <- nir / green - 1
return(ci)
}
#' Extract CI statistics by sub_field from a CI raster
#'
#' For a given field, masks the CI raster to each sub_field polygon
#' and calculates statistics (mean, median, sd, min, max, count).
#'
#' @param ci_raster Raster object containing CI values
#' @param field_boundaries_sf SF object containing field/sub_field polygons
#' @param field_name Character string of the field name to process
#' @return Data frame with field, sub_field, and CI statistics; NULL if field not found
#'
extract_ci_by_subfield <- function(ci_raster, field_boundaries_sf, field_name) {
# NOTE: Per-field TIFFs are already cropped to field boundaries by Script 10
# No need to mask again - just extract all valid pixels from the raster
# Extract ALL CI values (no masking needed for pre-cropped per-field TIFFs)
ci_values <- terra::values(ci_raster, na.rm = TRUE)
if (length(ci_values) > 0) {
result_row <- data.frame(
field = field_name,
sub_field = field_name, # Use field_name as sub_field since TIFF is already field-specific
ci_mean = mean(ci_values, na.rm = TRUE),
ci_median = median(ci_values, na.rm = TRUE),
ci_sd = sd(ci_values, na.rm = TRUE),
ci_min = min(ci_values, na.rm = TRUE),
ci_max = max(ci_values, na.rm = TRUE),
ci_count = length(ci_values),
stringsAsFactors = FALSE
)
} else {
result_row <- data.frame(
field = field_name,
sub_field = field_name,
ci_mean = NA_real_,
ci_median = NA_real_,
ci_sd = NA_real_,
ci_min = NA_real_,
ci_max = NA_real_,
ci_count = 0,
stringsAsFactors = FALSE
)
}
return(result_row)
}
#' Extract RDS from existing CI TIFF (Migration/Regeneration Mode)
#'
#' This function extracts CI statistics from an already-calculated CI TIFF
#' without needing to recalculate from raw 4-band imagery.
#' Used during migration when field_tiles_CI/ exists but daily_vals/{FIELD}/ is missing.
#'
#' @param ci_tiff_path Path to the 5-band TIFF containing CI as the 5th band
#' @param output_rds_path Path where to save the output RDS file
#' @param field_boundaries_sf SF object with field/sub_field polygons
#' @param field_name Name of the field to extract
#' @return The RDS data frame (invisibly) and saves to disk
#'
extract_rds_from_ci_tiff <- function(ci_tiff_path, output_rds_path, field_boundaries_sf, field_name) {
tryCatch({
# Load the 5-band TIFF
raster_5band <- terra::rast(ci_tiff_path)
# Extract CI (5th band)
# Assuming structure: [1]=R, [2]=G, [3]=B, [4]=NIR, [5]=CI
ci_raster <- raster_5band[[5]]
# Extract CI statistics by sub_field
ci_stats <- extract_ci_by_subfield(ci_raster, field_boundaries_sf, field_name)
# Save RDS
if (!is.null(ci_stats) && nrow(ci_stats) > 0) {
saveRDS(ci_stats, output_rds_path)
return(invisible(ci_stats))
} else {
safe_log(sprintf("No CI statistics extracted from %s", ci_tiff_path), "WARNING")
return(invisible(NULL))
}
}, error = function(e) {
safe_log(sprintf("Error extracting RDS from CI TIFF: %s", e$message), "ERROR")
return(invisible(NULL))
})
}
#' Regenerate ALL missing RDS files from existing CI TIFFs (Comprehensive Migration Mode)
#'
#' This function processes ALL dates in field_tiles_CI/ and extracts RDS for any missing daily_vals/
#' files. No date window filtering - processes the entire historical archive.
#'
#' Used for one-time migration of old projects where field_tiles_CI/ is populated but daily_vals/
#' RDS files are missing or incomplete.
#'
#' @param field_tiles_ci_dir Path to field_tiles_CI/ (input: pre-calculated CI TIFFs)
#' @param daily_vals_dir Path to daily_vals/ (output: RDS statistics files)
#' @param field_boundaries_sf SF object with field/sub_field polygons
#' @param fields Vector of field names to process
#'
#' @return List with summary stats: list(total_processed=N, total_skipped=M, total_errors=K)
#'
regenerate_all_missing_rds <- function(field_tiles_ci_dir, daily_vals_dir, field_boundaries_sf, fields) {
safe_log("\n========================================")
safe_log("MIGRATION MODE: REGENERATING ALL MISSING RDS")
safe_log("Processing ALL dates in field_tiles_CI/")
safe_log("========================================")
total_processed <- 0
total_skipped <- 0
total_errors <- 0
# Iterate through each field
for (field in fields) {
field_ci_path <- file.path(field_tiles_ci_dir, field)
field_daily_vals_path <- file.path(daily_vals_dir, field)
# Skip if field directory doesn't exist
if (!dir.exists(field_ci_path)) {
safe_log(sprintf(" Field %s: field_tiles_CI not found (skipping)", field))
continue
}
# Create output directory for RDS
dir.create(field_daily_vals_path, showWarnings = FALSE, recursive = TRUE)
# Find ALL CI TIFFs for this field (no date filtering)
ci_tiff_files <- list.files(
path = field_ci_path,
pattern = "^\\d{4}-\\d{2}-\\d{2}\\.tif$",
full.names = TRUE
)
if (length(ci_tiff_files) == 0) {
safe_log(sprintf(" Field %s: No CI TIFFs found (skipping)", field))
next
}
safe_log(sprintf(" Field %s: Found %d CI TIFFs to process", field, length(ci_tiff_files)))
# Process each CI TIFF
for (ci_tiff in ci_tiff_files) {
date_str <- tools::file_path_sans_ext(basename(ci_tiff))
output_rds <- file.path(field_daily_vals_path, sprintf("%s.rds", date_str))
# Skip if RDS already exists
if (file.exists(output_rds)) {
total_skipped <- total_skipped + 1
next
}
# Extract RDS from CI TIFF
tryCatch({
extract_rds_from_ci_tiff(ci_tiff, output_rds, field_boundaries_sf, field)
safe_log(sprintf(" %s: ✓ RDS extracted", date_str))
total_processed <- total_processed + 1
}, error = function(e) {
safe_log(sprintf(" %s: ✗ Error - %s", date_str, e$message), "ERROR")
total_errors <<- total_errors + 1
})
}
}
safe_log(sprintf("\nMigration complete: processed %d, skipped %d, errors %d",
total_processed, total_skipped, total_errors))
return(list(
total_processed = total_processed,
total_skipped = total_skipped,
total_errors = total_errors
))
}