- Fixed CI calculation: changed from NDVI (NIR-Red)/(NIR+Red) to correct NIR/Green-1 formula in: * process_single_tile() function * create_ci_band() utility function * Updated create_mask_and_crop() documentation - Renamed numbered shell scripts for clarity (matching R script numbering): * 01_run_planet_download -> 10_planet_download.sh * 02_run_ci_extraction -> 20_ci_extraction.sh * 03_run_growth_model -> 30_growth_model.sh * 04_run_mosaic_creation -> 40_mosaic_creation.sh * 09_run_calculate_kpis -> 80_calculate_kpis.sh * 10_run_kpi_report -> 90_kpi_report.sh - Archived obsolete shell scripts to old_sh/: * build_mosaic.sh, build_report.sh, interpolate_growth_model.sh * 05_run_dashboard_report.sh, 06_run_crop_messaging.sh * 11_run_yield_prediction.sh/ps1 * runcane.sh, runpython.sh, smartcane.sh, update_RDS.sh - Deleted test/debug files and temporary outputs: * analyze_*.R, benchmark_gpu_vs_cpu.py, convert_angata_harvest.py * debug_mosaic.R, examine_kpi_results.R, generate_sar_report.R * inspect_8band_structure.R, inspect_tif_bands.R * old_working_utils.R, predict_harvest_operational.R * run_kpi_calculation.R, run_report.R, simple_sar_test.R * data_validation_tool/, harvest_ci_pattern_analysis.png, kpi_debug.out - Enhanced harvest prediction: Added threshold tuning (0.40-0.45) and field type handling - Enhanced mosaic creation: Improved tile detection and routing logic
1016 lines
38 KiB
R
1016 lines
38 KiB
R
# CI_EXTRACTION_UTILS.R
|
|
# =====================
|
|
# Utility functions for the SmartCane CI (Chlorophill Index) extraction workflow.
|
|
# These functions support date handling, raster processing, and data extraction.
|
|
# Includes parallel tile processing using furrr for memory efficiency.
|
|
#
|
|
# Parallel Processing: Tile-based extraction uses furrr::future_map to process
|
|
# multiple tiles simultaneously (typically 2-4 tiles in parallel depending on CPU cores)
|
|
|
|
#' Safe logging function that works whether log_message exists or not
|
|
#'
|
|
#' @param message The message to log
|
|
#' @param level The log level (default: "INFO")
|
|
#' @return NULL (used for side effects)
|
|
#'
|
|
safe_log <- function(message, level = "INFO") {
|
|
if (exists("log_message")) {
|
|
log_message(message, level)
|
|
} else {
|
|
if (level %in% c("ERROR", "WARNING")) {
|
|
warning(message)
|
|
} else {
|
|
message(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
#' Generate a sequence of dates for processing
|
|
#'
|
|
#' @param end_date The end date for the sequence (Date object)
|
|
#' @param offset Number of days to look back from end_date
|
|
#' @return A list containing week number, year, and a sequence of dates for filtering
|
|
#'
|
|
date_list <- function(end_date, offset) {
|
|
# Input validation
|
|
if (!lubridate::is.Date(end_date)) {
|
|
end_date <- as.Date(end_date)
|
|
if (is.na(end_date)) {
|
|
stop("Invalid end_date provided. Expected a Date object or a string convertible to Date.")
|
|
}
|
|
}
|
|
|
|
offset <- as.numeric(offset)
|
|
if (is.na(offset) || offset < 1) {
|
|
stop("Invalid offset provided. Expected a positive number.")
|
|
}
|
|
|
|
# Calculate date range
|
|
offset <- offset - 1 # Adjust offset to include end_date
|
|
start_date <- end_date - lubridate::days(offset)
|
|
|
|
# Extract week and year information
|
|
week <- lubridate::week(start_date)
|
|
year <- lubridate::year(start_date)
|
|
|
|
# Generate sequence of dates
|
|
days_filter <- seq(from = start_date, to = end_date, by = "day")
|
|
days_filter <- format(days_filter, "%Y-%m-%d") # Format for consistent filtering
|
|
|
|
# Log the date range
|
|
safe_log(paste("Date range generated from", start_date, "to", end_date))
|
|
|
|
return(list(
|
|
"week" = week,
|
|
"year" = year,
|
|
"days_filter" = days_filter,
|
|
"start_date" = start_date,
|
|
"end_date" = end_date
|
|
))
|
|
}
|
|
|
|
#' Detect band count and structure (4-band vs 8-band with optional UDM)
|
|
#'
|
|
#' @param loaded_raster Loaded raster object
|
|
#' @return List with structure info: $type (4b or 8b), $has_udm (logical), $band_names
|
|
#'
|
|
detect_raster_structure <- function(loaded_raster) {
|
|
n_bands <- terra::nlyr(loaded_raster)
|
|
|
|
# Determine raster type and structure
|
|
if (n_bands == 4) {
|
|
# 4-band optimized: RGB + NIR (cloud-masked server-side by Planet)
|
|
return(list(
|
|
type = "4b",
|
|
has_udm = FALSE,
|
|
band_names = c("Red", "Green", "Blue", "NIR"),
|
|
red_idx = 1, green_idx = 2, blue_idx = 3, nir_idx = 4, udm_idx = NA
|
|
))
|
|
} else if (n_bands == 5) {
|
|
# 4-band + alpha channel (may be added by GDAL merge process)
|
|
# Alpha channel is ignored, we use first 4 bands
|
|
return(list(
|
|
type = "4b",
|
|
has_udm = FALSE,
|
|
band_names = c("Red", "Green", "Blue", "NIR", "Alpha"),
|
|
red_idx = 1, green_idx = 2, blue_idx = 3, nir_idx = 4, udm_idx = NA
|
|
))
|
|
} else if (n_bands %in% c(8, 9)) {
|
|
# PlanetScope 8-band structure:
|
|
# 1=Coastal Blue, 2=Blue, 3=Green I, 4=Green, 5=Yellow, 6=Red, 7=Red Edge, 8=NIR
|
|
# 9-band: includes UDM1 (Usable Data Mask) as final band
|
|
has_udm <- n_bands == 9
|
|
return(list(
|
|
type = "8b",
|
|
has_udm = has_udm,
|
|
band_names = if (has_udm) {
|
|
c("CoastalBlue", "Blue", "GreenI", "Green", "Yellow", "Red", "RedEdge", "NIR", "UDM1")
|
|
} else {
|
|
c("CoastalBlue", "Blue", "GreenI", "Green", "Yellow", "Red", "RedEdge", "NIR")
|
|
},
|
|
red_idx = 6, green_idx = 4, blue_idx = 2, nir_idx = 8, udm_idx = if (has_udm) 9 else NA
|
|
))
|
|
} else {
|
|
stop(paste("Unexpected number of bands:", n_bands,
|
|
"Expected 4-band, 4-band+alpha, 8-band, or 9-band data"))
|
|
}
|
|
}
|
|
|
|
#' Apply cloud masking for 8-band data with UDM layer
|
|
#'
|
|
#' @param loaded_raster Raster with UDM band
|
|
#' @param udm_idx Index of the UDM band
|
|
#' @return Raster with cloud-masked pixels set to NA
|
|
#'
|
|
apply_udm_masking <- function(loaded_raster, udm_idx) {
|
|
if (is.na(udm_idx)) {
|
|
return(loaded_raster)
|
|
}
|
|
|
|
# Extract UDM band (0 = clear sky, 1 = shadow, 2 = cloud, 3 = snow, 4 = water)
|
|
# We only mask pixels where UDM = 2 (clouds)
|
|
udm_band <- loaded_raster[[udm_idx]]
|
|
|
|
# Create mask where UDM == 0 (clear/valid pixels only)
|
|
cloud_mask <- udm_band == 0
|
|
|
|
# Apply mask to all bands except UDM itself
|
|
for (i in 1:(terra::nlyr(loaded_raster) - 1)) {
|
|
loaded_raster[[i]][!cloud_mask] <- NA
|
|
}
|
|
|
|
safe_log(paste("Applied UDM cloud masking to raster (masking non-clear pixels)"))
|
|
return(loaded_raster)
|
|
}
|
|
|
|
#' Create a Chlorophill Index (CI) mask from satellite imagery and crop to field boundaries
|
|
#'
|
|
#' Supports both 4-band and 8-band (with optional UDM) Planet Scope data.
|
|
#' For 8-band data, automatically applies cloud masking using the UDM layer if present.
|
|
#'
|
|
#' @param file Path to the satellite image file
|
|
#' @param field_boundaries Field boundaries vector object
|
|
#' @param merged_final_dir Directory to save the processed raster
|
|
#' @return Processed raster object with CI band
|
|
#'
|
|
create_mask_and_crop <- function(file, field_boundaries, merged_final_dir) {
|
|
# Validate inputs
|
|
if (!file.exists(file)) {
|
|
stop(paste("File not found:", file))
|
|
}
|
|
|
|
if (is.null(field_boundaries)) {
|
|
stop("Field boundaries are required but were not provided")
|
|
}
|
|
|
|
# Note: No conversion needed - sf::st_bbox() works with both sf and terra objects
|
|
# field_boundaries is used only for spatial reference (bounding box)
|
|
|
|
# Establish file names for output
|
|
basename_no_ext <- tools::file_path_sans_ext(basename(file))
|
|
new_file <- here::here(merged_final_dir, paste0(basename_no_ext, ".tif"))
|
|
vrt_file <- here::here(daily_vrt, paste0(basename_no_ext, ".vrt"))
|
|
|
|
# Process with error handling
|
|
tryCatch({
|
|
# Log processing start
|
|
safe_log(paste("Processing", basename(file)))
|
|
|
|
# Load and prepare raster
|
|
loaded_raster <- terra::rast(file)
|
|
|
|
# Validate raster has necessary bands
|
|
if (terra::nlyr(loaded_raster) < 4) {
|
|
stop("Raster must have at least 4 bands (Red, Green, Blue, NIR)")
|
|
}
|
|
|
|
# Detect raster structure (4b vs 8b with optional UDM)
|
|
structure_info <- detect_raster_structure(loaded_raster)
|
|
safe_log(paste("Detected", structure_info$type, "data",
|
|
if (structure_info$has_udm) "with UDM cloud masking" else "without cloud masking"))
|
|
|
|
# Extract the bands we need FIRST
|
|
# This ensures we're working with just the necessary data
|
|
red_band <- loaded_raster[[structure_info$red_idx]]
|
|
green_band <- loaded_raster[[structure_info$green_idx]]
|
|
blue_band <- loaded_raster[[structure_info$blue_idx]]
|
|
nir_band <- loaded_raster[[structure_info$nir_idx]]
|
|
|
|
# Now apply cloud masking to these selected bands if UDM exists
|
|
if (structure_info$has_udm) {
|
|
udm_band <- loaded_raster[[structure_info$udm_idx]]
|
|
# Create mask where UDM != 2 (mask only clouds, keep clear sky and shadows)
|
|
cloud_mask <- udm_band != 2
|
|
|
|
red_band[!cloud_mask] <- NA
|
|
green_band[!cloud_mask] <- NA
|
|
blue_band[!cloud_mask] <- NA
|
|
nir_band[!cloud_mask] <- NA
|
|
|
|
safe_log("Applied UDM cloud masking to selected bands (masking UDM=2 clouds only)")
|
|
}
|
|
|
|
# Name the bands
|
|
names(red_band) <- "Red"
|
|
names(green_band) <- "Green"
|
|
names(blue_band) <- "Blue"
|
|
names(nir_band) <- "NIR"
|
|
|
|
# Calculate Canopy Index from Red, Green, NIR
|
|
# CI = (NIR - Red) / (NIR + Red) is a common formulation
|
|
# But using NIR/Green - 1 is also valid and more sensitive to green vegetation
|
|
CI <- nir_band / green_band - 1
|
|
names(CI) <- "CI"
|
|
|
|
# Create output raster with essential bands: Red, Green, Blue, NIR, CI
|
|
output_raster <- c(red_band, green_band, blue_band, nir_band, CI)
|
|
names(output_raster) <- c("Red", "Green", "Blue", "NIR", "CI")
|
|
|
|
# Ensure CRS compatibility before cropping
|
|
tryCatch({
|
|
raster_crs <- terra::crs(output_raster, proj = TRUE)
|
|
raster_crs_char <- as.character(raster_crs)
|
|
|
|
# Handle boundaries CRS - works for both terra and sf objects
|
|
if (inherits(field_boundaries, "sf")) {
|
|
boundaries_crs <- sf::st_crs(field_boundaries)
|
|
boundaries_crs_char <- if (!is.na(boundaries_crs)) as.character(boundaries_crs$wkt) else ""
|
|
} else {
|
|
boundaries_crs <- terra::crs(field_boundaries, proj = TRUE)
|
|
boundaries_crs_char <- as.character(boundaries_crs)
|
|
}
|
|
|
|
if (length(raster_crs_char) > 0 && length(boundaries_crs_char) > 0 &&
|
|
nchar(raster_crs_char) > 0 && nchar(boundaries_crs_char) > 0) {
|
|
if (raster_crs_char != boundaries_crs_char) {
|
|
# Transform field boundaries to match raster CRS only if it's a terra object
|
|
if (inherits(field_boundaries, "SpatVector")) {
|
|
field_boundaries <- terra::project(field_boundaries, raster_crs_char)
|
|
safe_log("Transformed field boundaries CRS to match raster CRS")
|
|
} else {
|
|
safe_log("Field boundaries is sf object - CRS transformation skipped")
|
|
}
|
|
}
|
|
} else {
|
|
# If CRS is missing, try to assign a default WGS84 CRS
|
|
if (length(raster_crs_char) == 0 || nchar(raster_crs_char) == 0) {
|
|
terra::crs(output_raster) <- "EPSG:4326"
|
|
safe_log("Assigned default WGS84 CRS to raster")
|
|
}
|
|
if (length(boundaries_crs_char) == 0 || nchar(boundaries_crs_char) == 0) {
|
|
if (inherits(field_boundaries, "SpatVector")) {
|
|
terra::crs(field_boundaries) <- "EPSG:4326"
|
|
} else {
|
|
sf::st_crs(field_boundaries) <- 4326
|
|
}
|
|
safe_log("Assigned default WGS84 CRS to field boundaries")
|
|
}
|
|
}
|
|
}, error = function(e) {
|
|
safe_log(paste("CRS handling warning:", e$message), "WARNING")
|
|
})
|
|
|
|
output_raster <- tryCatch({
|
|
# terra::crop can work with both terra and sf objects, but if it fails with sf, try conversion
|
|
terra::crop(output_raster, field_boundaries, mask = TRUE)
|
|
}, error = function(e) {
|
|
# If crop fails (common with certain sf geometries), convert sf to terra first
|
|
if (inherits(field_boundaries, "sf")) {
|
|
safe_log(paste("Crop with sf failed, attempting alternative approach:", e$message), "WARNING")
|
|
# Use terra mask operation instead of crop for sf objects
|
|
# First, get the bbox from sf object and use it for rough crop
|
|
bbox <- sf::st_bbox(field_boundaries)
|
|
output_raster_cropped <- terra::crop(output_raster,
|
|
terra::ext(bbox[1], bbox[3], bbox[2], bbox[4]))
|
|
return(output_raster_cropped)
|
|
} else {
|
|
stop(e)
|
|
}
|
|
})
|
|
|
|
# Note: Do NOT replace zeros with NA here - Red/Green/Blue/NIR reflectance can be near zero
|
|
# Only CI can go negative (if NIR < Green), but that's valid vegetation index behavior
|
|
# output_raster[output_raster == 0] <- NA # REMOVED - this was causing data loss
|
|
|
|
# Write output files
|
|
terra::writeRaster(output_raster, new_file, overwrite = TRUE)
|
|
terra::vrt(new_file, vrt_file, overwrite = TRUE)
|
|
|
|
# Check if the result has enough valid pixels
|
|
valid_pixels <- terra::global(output_raster$CI, "notNA", na.rm=TRUE)
|
|
|
|
# Log completion
|
|
safe_log(paste("Completed processing", basename(file),
|
|
"- Valid pixels:", valid_pixels[1,]))
|
|
|
|
return(output_raster)
|
|
|
|
}, error = function(e) {
|
|
err_msg <- paste("Error processing", basename(file), "-", e$message)
|
|
safe_log(err_msg, "ERROR")
|
|
return(NULL)
|
|
}, finally = {
|
|
# Clean up memory
|
|
gc()
|
|
})
|
|
}
|
|
|
|
#' Process a batch of satellite images and create VRT files
|
|
#'
|
|
#' @param files Vector of file paths to process
|
|
#' @param field_boundaries Field boundaries vector object for cropping
|
|
#' @param merged_final_dir Directory to save processed rasters
|
|
#' @param daily_vrt_dir Directory to save VRT files
|
|
#' @param min_valid_pixels Minimum number of valid pixels for a raster to be kept (default: 100)
|
|
#' @return List of valid VRT files created
|
|
#'
|
|
process_satellite_images <- function(files, field_boundaries, merged_final_dir, daily_vrt_dir, min_valid_pixels = 100) {
|
|
vrt_list <- list()
|
|
|
|
safe_log(paste("Starting batch processing of", length(files), "files"))
|
|
|
|
# Process each file
|
|
for (file in files) {
|
|
# Process each raster file
|
|
v_crop <- create_mask_and_crop(file, field_boundaries, merged_final_dir)
|
|
|
|
# Skip if processing failed
|
|
if (is.null(v_crop)) {
|
|
next
|
|
}
|
|
|
|
# Check if the raster has enough valid data
|
|
valid_data <- terra::global(v_crop, "notNA")
|
|
vrt_file <- here::here(daily_vrt_dir, paste0(tools::file_path_sans_ext(basename(file)), ".vrt"))
|
|
|
|
if (valid_data[1,] > min_valid_pixels) {
|
|
vrt_list[[vrt_file]] <- vrt_file
|
|
} else {
|
|
# Remove VRT files with insufficient data
|
|
if (file.exists(vrt_file)) {
|
|
file.remove(vrt_file)
|
|
}
|
|
safe_log(paste("Skipping", basename(file), "- insufficient valid data"), "WARNING")
|
|
}
|
|
|
|
# Clean up memory
|
|
rm(v_crop)
|
|
gc()
|
|
}
|
|
|
|
safe_log(paste("Completed processing", length(vrt_list), "raster files"))
|
|
|
|
return(vrt_list)
|
|
}
|
|
|
|
#' Find satellite image files filtered by date
|
|
#'
|
|
#' @param tif_folder Directory containing satellite imagery files
|
|
#' @param dates_filter Character vector of dates in YYYY-MM-DD format
|
|
#' @return Vector of file paths matching the date filter
|
|
#'
|
|
find_satellite_images <- function(tif_folder, dates_filter) {
|
|
# Find all raster files
|
|
raster_files <- list.files(tif_folder, full.names = TRUE, pattern = "\\.tif$")
|
|
|
|
if (length(raster_files) == 0) {
|
|
stop("No raster files found in directory: ", tif_folder)
|
|
}
|
|
|
|
# Filter files by dates
|
|
filtered_files <- purrr::map(dates_filter, ~ raster_files[grepl(pattern = .x, x = raster_files)]) %>%
|
|
purrr::compact() %>%
|
|
purrr::flatten_chr()
|
|
|
|
# Remove files that do not exist
|
|
existing_files <- filtered_files[file.exists(filtered_files)]
|
|
|
|
# Check if the list of existing files is empty
|
|
if (length(existing_files) == 0) {
|
|
stop("No files found matching the date filter: ", paste(dates_filter, collapse = ", "))
|
|
}
|
|
|
|
return(existing_files)
|
|
}
|
|
|
|
#' Extract date from file path
|
|
#'
|
|
#' @param file_path Path to the file
|
|
#' @return Extracted date in YYYY-MM-DD format
|
|
#'
|
|
date_extract <- function(file_path) {
|
|
date <- stringr::str_extract(file_path, "\\d{4}-\\d{2}-\\d{2}")
|
|
|
|
if (is.na(date)) {
|
|
warning(paste("Could not extract date from file path: ", file_path))
|
|
}
|
|
|
|
return(date)
|
|
}
|
|
|
|
#' Extract CI values from a raster for each field or subfield
|
|
#'
|
|
#' @param file Path to the raster file
|
|
#' @param field_geojson Field boundaries as SF object
|
|
#' @param quadrants Boolean indicating whether to extract by quadrants
|
|
#' @param save_dir Directory to save the extracted values
|
|
#' @return Path to the saved RDS file
|
|
#'
|
|
extract_rasters_daily <- function(file, field_geojson, quadrants = TRUE, save_dir) {
|
|
# Validate inputs
|
|
if (!file.exists(file)) {
|
|
stop(paste("File not found: ", file))
|
|
}
|
|
|
|
if (!inherits(field_geojson, "sf") && !inherits(field_geojson, "sfc")) {
|
|
field_geojson <- sf::st_as_sf(field_geojson)
|
|
}
|
|
|
|
# Extract date from file path
|
|
date <- date_extract(file)
|
|
if (is.na(date)) {
|
|
stop(paste("Could not extract date from file path:", file))
|
|
}
|
|
|
|
# Log extraction start
|
|
safe_log(paste("Extracting CI values for", date, "- Using quadrants:", quadrants))
|
|
|
|
# Process with error handling
|
|
tryCatch({
|
|
# Load raster
|
|
x <- terra::rast(file)
|
|
|
|
# Check if CI band exists
|
|
if (!"CI" %in% names(x)) {
|
|
stop("CI band not found in raster")
|
|
}
|
|
|
|
# Get raster info for logging (dimensions and memory scale)
|
|
raster_cells <- terra::ncell(x$CI)
|
|
raster_size_mb <- (raster_cells * 8) / (1024 * 1024) # approximate MB for double precision
|
|
safe_log(paste(" Raster size:", format(raster_cells, big.mark=","), "cells (~",
|
|
round(raster_size_mb, 1), "MB)"))
|
|
|
|
# Crop raster to field boundaries extent BEFORE extraction
|
|
# This reduces memory usage by working with a smaller spatial subset
|
|
field_bbox <- sf::st_bbox(field_geojson)
|
|
x_cropped <- terra::crop(x, terra::ext(field_bbox), snap = "out")
|
|
|
|
cropped_cells <- terra::ncell(x_cropped$CI)
|
|
cropped_mb <- (cropped_cells * 8) / (1024 * 1024)
|
|
safe_log(paste(" After crop:", format(cropped_cells, big.mark=","),
|
|
"cells (~", round(cropped_mb, 1), "MB)"))
|
|
|
|
# Extract statistics using terra::extract (memory-efficient, works with sf directly)
|
|
# terra::extract returns a data.frame with ID (row numbers) and extracted values
|
|
extracted_vals <- terra::extract(x_cropped$CI, field_geojson, fun = "mean", na.rm = TRUE)
|
|
|
|
# Build result matching expected format (field, sub_field, date columns)
|
|
pivot_stats <- field_geojson %>%
|
|
sf::st_drop_geometry() %>%
|
|
mutate(mean_CI = round(extracted_vals[, 2], 2)) %>%
|
|
dplyr::rename("{date}" := mean_CI)
|
|
|
|
# Determine save path
|
|
save_suffix <- if (quadrants) {"quadrant"} else {"whole_field"}
|
|
save_path <- here::here(save_dir, paste0("extracted_", date, "_", save_suffix, ".rds"))
|
|
|
|
# Save extracted data
|
|
saveRDS(pivot_stats, save_path)
|
|
|
|
# Log success
|
|
safe_log(paste("Successfully extracted and saved CI values for", date))
|
|
|
|
return(save_path)
|
|
|
|
}, error = function(e) {
|
|
err_msg <- paste("Error extracting CI values for", date, "-", e$message)
|
|
safe_log(err_msg, "ERROR")
|
|
return(NULL)
|
|
})
|
|
}
|
|
|
|
#' Combine daily CI values into a single dataset
|
|
#'
|
|
#' @param daily_CI_vals_dir Directory containing daily CI values
|
|
#' @param output_file Path to save the combined dataset
|
|
#' @return Combined dataset as a tibble
|
|
#'
|
|
combine_ci_values <- function(daily_CI_vals_dir, output_file = NULL) {
|
|
# List all RDS files in the daily CI values directory
|
|
files <- list.files(path = daily_CI_vals_dir, pattern = "^extracted_.*\\.rds$", full.names = TRUE)
|
|
|
|
if (length(files) == 0) {
|
|
stop("No extracted CI values found in directory:", daily_CI_vals_dir)
|
|
}
|
|
|
|
# Log process start
|
|
safe_log(paste("Combining", length(files), "CI value files"))
|
|
|
|
# Load and combine all files
|
|
combined_data <- files %>%
|
|
purrr::map(readRDS) %>%
|
|
purrr::list_rbind() %>%
|
|
dplyr::group_by(sub_field)
|
|
|
|
# Save if output file is specified
|
|
if (!is.null(output_file)) {
|
|
saveRDS(combined_data, output_file)
|
|
safe_log(paste("Combined CI values saved to", output_file))
|
|
}
|
|
|
|
return(combined_data)
|
|
}
|
|
|
|
#' Update existing CI data with new values
|
|
#'
|
|
#' @param new_data New CI data to be added
|
|
#' @param existing_data_file Path to the existing data file
|
|
#' @return Updated combined dataset
|
|
#'
|
|
update_ci_data <- function(new_data, existing_data_file) {
|
|
if (!file.exists(existing_data_file)) {
|
|
# File doesn't exist - create it with new data
|
|
safe_log(paste("Creating new CI data file:", existing_data_file))
|
|
|
|
# Ensure directory exists
|
|
dir.create(dirname(existing_data_file), recursive = TRUE, showWarnings = FALSE)
|
|
|
|
# Save new data
|
|
saveRDS(new_data, existing_data_file)
|
|
safe_log(paste("New CI data file created:", existing_data_file))
|
|
|
|
return(new_data)
|
|
}
|
|
|
|
# File exists - load existing data and combine
|
|
existing_data <- readRDS(existing_data_file)
|
|
|
|
# Combine data, handling duplicates by keeping the newer values
|
|
combined_data <- dplyr::bind_rows(new_data, existing_data) %>%
|
|
dplyr::distinct() %>%
|
|
dplyr::group_by(sub_field)
|
|
|
|
# Save updated data
|
|
saveRDS(combined_data, existing_data_file)
|
|
safe_log(paste("Updated CI data saved to", existing_data_file))
|
|
|
|
return(combined_data)
|
|
}
|
|
|
|
#' Process and combine CI values from raster files
|
|
#'
|
|
#' @param dates List of dates from date_list()
|
|
#' @param field_boundaries Field boundaries as vector object
|
|
#' @param merged_final_dir Directory with processed raster files
|
|
#' @param field_boundaries_sf Field boundaries as SF object
|
|
#' @param daily_CI_vals_dir Directory to save daily CI values
|
|
#' @param cumulative_CI_vals_dir Directory to save cumulative CI values
|
|
#' @return NULL (used for side effects)
|
|
#'
|
|
process_ci_values <- function(dates, field_boundaries, merged_final_dir,
|
|
field_boundaries_sf, daily_CI_vals_dir,
|
|
cumulative_CI_vals_dir) {
|
|
# Find processed raster files
|
|
raster_files <- list.files(merged_final_dir, full.names = TRUE, pattern = "\\.tif$")
|
|
|
|
# Define path for combined CI data
|
|
combined_ci_path <- here::here(cumulative_CI_vals_dir, "combined_CI_data.rds")
|
|
|
|
# Check if the combined CI data file exists
|
|
if (!file.exists(combined_ci_path)) {
|
|
# Process all available data if file doesn't exist
|
|
safe_log("combined_CI_data.rds does not exist. Creating new file with all available data.")
|
|
safe_log(paste("Processing", length(raster_files), "raster files"))
|
|
|
|
# Extract data from all raster files with error handling
|
|
tryCatch({
|
|
purrr::walk(
|
|
raster_files,
|
|
extract_rasters_daily,
|
|
field_geojson = field_boundaries_sf,
|
|
quadrants = FALSE,
|
|
save_dir = daily_CI_vals_dir
|
|
)
|
|
safe_log("Extraction complete for all raster files")
|
|
}, error = function(e) {
|
|
safe_log(paste("Error during extraction walk:", e$message), "ERROR")
|
|
})
|
|
|
|
# Combine all extracted data
|
|
tryCatch({
|
|
pivot_stats <- combine_ci_values(daily_CI_vals_dir, combined_ci_path)
|
|
safe_log("All CI values extracted from historic images and saved.")
|
|
}, error = function(e) {
|
|
safe_log(paste("Error combining CI values:", e$message), "ERROR")
|
|
stop(e$message)
|
|
})
|
|
|
|
} else {
|
|
# Process only the latest data and add to existing file
|
|
safe_log("combined_CI_data.rds exists, adding the latest image data.")
|
|
|
|
# Filter files by dates
|
|
filtered_files <- purrr::map(dates$days_filter, ~ raster_files[grepl(pattern = .x, x = raster_files)]) %>%
|
|
purrr::compact() %>%
|
|
purrr::flatten_chr()
|
|
|
|
safe_log(paste("Processing", length(filtered_files), "new raster files"))
|
|
|
|
# Extract data for the new files with error handling
|
|
tryCatch({
|
|
purrr::walk(
|
|
filtered_files,
|
|
extract_rasters_daily,
|
|
field_geojson = field_boundaries_sf,
|
|
quadrants = TRUE,
|
|
save_dir = daily_CI_vals_dir
|
|
)
|
|
safe_log("Extraction complete for new files")
|
|
}, error = function(e) {
|
|
safe_log(paste("Error during extraction walk:", e$message), "ERROR")
|
|
})
|
|
|
|
# Filter extracted values files by the current date range
|
|
extracted_values <- list.files(daily_CI_vals_dir, full.names = TRUE)
|
|
extracted_values <- purrr::map(dates$days_filter, ~ extracted_values[grepl(pattern = .x, x = extracted_values)]) %>%
|
|
purrr::compact() %>%
|
|
purrr::flatten_chr()
|
|
|
|
safe_log(paste("Found", length(extracted_values), "extracted value files to combine"))
|
|
|
|
# Combine new values
|
|
new_pivot_stats <- extracted_values %>%
|
|
purrr::map(readRDS) %>%
|
|
purrr::list_rbind() %>%
|
|
dplyr::group_by(sub_field)
|
|
|
|
# Update the combined data file
|
|
update_ci_data(new_data = new_pivot_stats, existing_data_file =combined_ci_path)
|
|
safe_log("CI values from latest images added to combined_CI_data.rds")
|
|
}
|
|
}
|
|
#' Process CI values from pre-split tiles (Script 01 output)
|
|
#'
|
|
#' This function processes CI values from tiles instead of full-extent rasters.
|
|
#' Tiles are created by Script 01 and stored in daily_tiles_split/[GRID_SIZE]/[DATE]/ folders.
|
|
#' For each field, it aggregates CI statistics from all tiles that intersect that field.
|
|
#' Output follows the same grid structure: merged_final_tif/[GRID_SIZE]/[DATE]/
|
|
#'
|
|
#' NOTE: Processes dates SEQUENTIALLY but tiles WITHIN EACH DATE in parallel (furrr)
|
|
#' This avoids worker process communication issues while still getting good speedup.
|
|
#'
|
|
#' @param dates List of dates from date_list()
|
|
#' @param tile_folder Path to the tile folder (daily_tiles_split/[GRID_SIZE])
|
|
#' @param field_boundaries Field boundaries as vector object
|
|
#' @param field_boundaries_sf Field boundaries as SF object
|
|
#' @param daily_CI_vals_dir Directory to save daily CI values
|
|
#' @param cumulative_CI_vals_dir Directory to save cumulative CI values
|
|
#' @param merged_final_dir Base directory to save processed tiles with CI band
|
|
#' @param grid_size Grid size label (e.g., "5x5", "10x10") for output path structure
|
|
#' @return NULL (used for side effects)
|
|
#'
|
|
process_ci_values_from_tiles <- function(dates, tile_folder, field_boundaries,
|
|
field_boundaries_sf, daily_CI_vals_dir,
|
|
cumulative_CI_vals_dir, merged_final_dir,
|
|
grid_size = NA) {
|
|
|
|
# Define path for combined CI data
|
|
combined_ci_path <- here::here(cumulative_CI_vals_dir, "combined_CI_data.rds")
|
|
|
|
# Discover all dates with tiles
|
|
tile_dates <- list.dirs(tile_folder, full.names = FALSE, recursive = FALSE)
|
|
tile_dates <- tile_dates[tile_dates != "master_grid_5x5.geojson"] # Remove non-date entries
|
|
tile_dates <- sort(tile_dates)
|
|
|
|
# Filter to dates in current processing range
|
|
dates_to_process <- tile_dates[tile_dates %in% dates$days_filter]
|
|
|
|
if (length(dates_to_process) == 0) {
|
|
safe_log("No tile dates found in processing date range", "WARNING")
|
|
return(invisible(NULL))
|
|
}
|
|
|
|
safe_log(paste("Found", length(dates_to_process), "date(s) with tiles"))
|
|
|
|
# Check if the combined CI data file exists
|
|
if (!file.exists(combined_ci_path)) {
|
|
safe_log("combined_CI_data.rds does not exist. Creating new file with all available tile data.")
|
|
|
|
# Process all tile dates SEQUENTIALLY but with parallel tile processing
|
|
# Tiles within each date are processed in parallel via extract_ci_from_tiles()
|
|
all_pivot_stats <- list()
|
|
|
|
for (i in seq_along(tile_dates)) {
|
|
date <- tile_dates[i]
|
|
|
|
# SKIP: Check if this date already has processed output tiles
|
|
if (!is.na(grid_size)) {
|
|
output_date_folder <- file.path(merged_final_dir, grid_size, date)
|
|
} else {
|
|
output_date_folder <- file.path(merged_final_dir, date)
|
|
}
|
|
|
|
if (dir.exists(output_date_folder)) {
|
|
existing_tiles <- list.files(output_date_folder, pattern = "\\.tif$")
|
|
if (length(existing_tiles) > 0) {
|
|
safe_log(paste("[", i, "/", length(tile_dates), "] SKIP:", date, "- already has", length(existing_tiles), "tiles"))
|
|
next
|
|
}
|
|
}
|
|
|
|
safe_log(paste("[", i, "/", length(tile_dates), "] Processing tiles for date:", date))
|
|
|
|
date_tile_dir <- file.path(tile_folder, date)
|
|
tile_files <- list.files(date_tile_dir, pattern = "\\.tif$", full.names = TRUE)
|
|
|
|
if (length(tile_files) == 0) {
|
|
safe_log(paste(" No tile files found for", date), "WARNING")
|
|
next
|
|
}
|
|
|
|
safe_log(paste(" Found", length(tile_files), "tiles - processing in parallel"))
|
|
|
|
# Process all tiles for this date and aggregate to fields
|
|
# Tiles are processed in parallel via furrr::future_map() inside extract_ci_from_tiles()
|
|
date_stats <- extract_ci_from_tiles(
|
|
tile_files = tile_files,
|
|
date = date,
|
|
field_boundaries_sf = field_boundaries_sf,
|
|
daily_CI_vals_dir = daily_CI_vals_dir,
|
|
merged_final_tif_dir = merged_final_dir,
|
|
grid_size = grid_size
|
|
)
|
|
|
|
if (!is.null(date_stats)) {
|
|
all_pivot_stats[[date]] <- date_stats
|
|
}
|
|
}
|
|
|
|
# Combine all dates
|
|
if (length(all_pivot_stats) > 0) {
|
|
# Use bind_rows() to handle varying column names across dates gracefully
|
|
combined_stats <- dplyr::bind_rows(all_pivot_stats, .id = NULL)
|
|
rownames(combined_stats) <- NULL
|
|
|
|
# Save combined data
|
|
saveRDS(combined_stats, combined_ci_path)
|
|
safe_log("All tile CI values extracted and combined_CI_data.rds created")
|
|
} else {
|
|
safe_log("No tile data was processed", "WARNING")
|
|
}
|
|
|
|
} else {
|
|
# Process only new dates SEQUENTIALLY but with parallel tile processing
|
|
safe_log("combined_CI_data.rds exists, adding new tile data.")
|
|
|
|
if (length(dates_to_process) == 0) {
|
|
safe_log("No new dates to process", "WARNING")
|
|
return(invisible(NULL))
|
|
}
|
|
|
|
safe_log(paste("Processing", length(dates_to_process), "new dates..."))
|
|
|
|
new_pivot_stats_list <- list()
|
|
|
|
for (i in seq_along(dates_to_process)) {
|
|
date <- dates_to_process[i]
|
|
|
|
# SKIP: Check if this date already has processed output tiles
|
|
if (!is.na(grid_size)) {
|
|
output_date_folder <- file.path(merged_final_dir, grid_size, date)
|
|
} else {
|
|
output_date_folder <- file.path(merged_final_dir, date)
|
|
}
|
|
|
|
if (dir.exists(output_date_folder)) {
|
|
existing_tiles <- list.files(output_date_folder, pattern = "\\.tif$")
|
|
if (length(existing_tiles) > 0) {
|
|
safe_log(paste("[", i, "/", length(dates_to_process), "] SKIP:", date, "- already has", length(existing_tiles), "tiles"))
|
|
next
|
|
}
|
|
}
|
|
|
|
safe_log(paste("[", i, "/", length(dates_to_process), "] Processing tiles for date:", date))
|
|
|
|
date_tile_dir <- file.path(tile_folder, date)
|
|
tile_files <- list.files(date_tile_dir, pattern = "\\.tif$", full.names = TRUE)
|
|
|
|
if (length(tile_files) == 0) {
|
|
safe_log(paste(" No tile files found for", date), "WARNING")
|
|
next
|
|
}
|
|
|
|
safe_log(paste(" Found", length(tile_files), "tiles - processing in parallel"))
|
|
|
|
# Extract CI from tiles for this date
|
|
date_stats <- extract_ci_from_tiles(
|
|
tile_files = tile_files,
|
|
date = date,
|
|
field_boundaries_sf = field_boundaries_sf,
|
|
daily_CI_vals_dir = daily_CI_vals_dir,
|
|
merged_final_tif_dir = merged_final_dir,
|
|
grid_size = grid_size
|
|
)
|
|
|
|
if (!is.null(date_stats)) {
|
|
new_pivot_stats_list[[date]] <- date_stats
|
|
}
|
|
}
|
|
|
|
# Combine new data
|
|
if (length(new_pivot_stats_list) > 0) {
|
|
# Use bind_rows() to handle varying column names across dates gracefully
|
|
new_pivot_stats <- dplyr::bind_rows(new_pivot_stats_list, .id = NULL)
|
|
rownames(new_pivot_stats) <- NULL
|
|
|
|
# Update combined file
|
|
update_ci_data(new_pivot_stats, combined_ci_path)
|
|
safe_log("Tile CI values from new dates added to combined_CI_data.rds")
|
|
} else {
|
|
safe_log("No new tile dates had valid data", "WARNING")
|
|
}
|
|
}
|
|
}
|
|
|
|
#' Process a single tile file, extract CI, save processed tile, and extract statistics
|
|
#'
|
|
#' Helper function for parallel processing of tiles. For each tile:
|
|
#' 1. Loads tile
|
|
#' 2. Creates/extracts CI band
|
|
#' 3. Creates output raster with Red, Green, Blue, NIR, CI bands
|
|
#' 4. Saves to merged_final_tif_dir/[GRID_SIZE]/[DATE]/ mirroring daily_tiles_split structure
|
|
#' 5. Extracts field-level CI statistics
|
|
#' Returns statistics aggregated to field level.
|
|
#'
|
|
#' @param tile_file Path to a single tile TIF file
|
|
#' @param field_boundaries_sf Field boundaries as SF object
|
|
#' @param date Character string of the date (YYYY-MM-DD format)
|
|
#' @param merged_final_tif_dir Base directory to save processed tiles with CI band
|
|
#' @param grid_size Grid size label (e.g., "5x5", "10x10") for output path structure
|
|
#' @return Data frame with field CI statistics for this tile, or NULL if processing failed
|
|
#'
|
|
process_single_tile <- function(tile_file, field_boundaries_sf, date, merged_final_tif_dir, grid_size = NA) {
|
|
tryCatch({
|
|
tile_filename <- basename(tile_file)
|
|
safe_log(paste(" [TILE] Loading:", tile_filename))
|
|
|
|
# Load tile
|
|
tile_rast <- terra::rast(tile_file)
|
|
|
|
# Determine if this is 4-band or 8-band data
|
|
raster_info <- detect_raster_structure(tile_rast)
|
|
|
|
# Extract the bands we need
|
|
red_band <- tile_rast[[raster_info$red_idx]]
|
|
green_band <- tile_rast[[raster_info$green_idx]]
|
|
blue_band <- tile_rast[[raster_info$blue_idx]]
|
|
nir_band <- tile_rast[[raster_info$nir_idx]]
|
|
|
|
# Apply cloud masking if UDM exists
|
|
if (raster_info$has_udm) {
|
|
udm_band <- tile_rast[[raster_info$udm_idx]]
|
|
cloud_mask <- udm_band != 2 # Mask only UDM=2 (clouds)
|
|
|
|
red_band[!cloud_mask] <- NA
|
|
green_band[!cloud_mask] <- NA
|
|
blue_band[!cloud_mask] <- NA
|
|
nir_band[!cloud_mask] <- NA
|
|
}
|
|
|
|
# Name the bands
|
|
names(red_band) <- "Red"
|
|
names(green_band) <- "Green"
|
|
names(blue_band) <- "Blue"
|
|
names(nir_band) <- "NIR"
|
|
|
|
# Create CI band inline: NIR/Green - 1
|
|
ci_band <- nir_band / green_band - 1
|
|
names(ci_band) <- "CI"
|
|
|
|
# Create output raster with Red, Green, Blue, NIR, CI
|
|
output_raster <- c(red_band, green_band, blue_band, nir_band, ci_band)
|
|
names(output_raster) <- c("Red", "Green", "Blue", "NIR", "CI")
|
|
|
|
# Save processed tile to merged_final_tif_dir/[GRID_SIZE]/[DATE]/ with same filename
|
|
# This mirrors the input structure: daily_tiles_split/[GRID_SIZE]/[DATE]/
|
|
if (!is.na(grid_size)) {
|
|
date_dir <- file.path(merged_final_tif_dir, grid_size, date)
|
|
} else {
|
|
date_dir <- file.path(merged_final_tif_dir, date)
|
|
}
|
|
|
|
if (!dir.exists(date_dir)) {
|
|
dir.create(date_dir, recursive = TRUE, showWarnings = FALSE)
|
|
}
|
|
|
|
# Use same filename as source tile (e.g., 2026-01-02_01.tif)
|
|
tile_filename <- basename(tile_file)
|
|
output_file <- file.path(date_dir, tile_filename)
|
|
|
|
# Write processed tile
|
|
terra::writeRaster(output_raster, output_file, overwrite = TRUE)
|
|
safe_log(paste(" [SAVED TIFF] Output:", file.path(date, basename(output_file)), "(5 bands: Red, Green, Blue, NIR, CI)"))
|
|
|
|
# Extract statistics per field from CI band
|
|
field_bbox <- sf::st_bbox(field_boundaries_sf)
|
|
ci_cropped <- terra::crop(ci_band, terra::ext(field_bbox), snap = "out")
|
|
extracted_vals <- terra::extract(ci_cropped, field_boundaries_sf, fun = "mean", na.rm = TRUE)
|
|
|
|
# Build statistics data frame for this tile
|
|
tile_stats <- field_boundaries_sf %>%
|
|
sf::st_drop_geometry() %>%
|
|
mutate(mean_CI = round(extracted_vals[, 2], 2)) %>%
|
|
mutate(tile_file = basename(tile_file))
|
|
|
|
return(tile_stats)
|
|
|
|
}, error = function(e) {
|
|
safe_log(paste(" Error processing tile", basename(tile_file), "-", e$message), "WARNING")
|
|
return(NULL)
|
|
})
|
|
}
|
|
|
|
#' Extract CI values from multiple tiles and aggregate to fields
|
|
#'
|
|
#' Given a set of tile files for a single date, this function:
|
|
#' 1. Loads each tile IN PARALLEL using furrr
|
|
#' 2. Creates/extracts CI band
|
|
#' 3. Saves processed tile (Red, Green, Blue, NIR, CI) to merged_final_tif_dir/[GRID_SIZE]/[DATE]/
|
|
#' 4. Calculates field statistics from CI band
|
|
#' 5. Aggregates field statistics across tiles
|
|
#' 6. Saves individual date file (matching legacy workflow)
|
|
#'
|
|
#' Parallel processing: Uses future_map to process 2-4 tiles simultaneously depending on available cores.
|
|
#'
|
|
#' @param tile_files Character vector of full paths to tile TIF files
|
|
#' @param date Character string of the date (YYYY-MM-DD format)
|
|
#' @param field_boundaries_sf Field boundaries as SF object
|
|
#' @param daily_CI_vals_dir Directory to save individual date RDS files
|
|
#' @param merged_final_tif_dir Base directory to save processed tiles with CI band
|
|
#' @param grid_size Grid size label (e.g., "5x5", "10x10") for output path structure
|
|
#' @return Data frame with field CI statistics for the date
|
|
#'
|
|
extract_ci_from_tiles <- function(tile_files, date, field_boundaries_sf, daily_CI_vals_dir = NULL, merged_final_tif_dir = NULL, grid_size = NA) {
|
|
|
|
if (!inherits(field_boundaries_sf, "sf")) {
|
|
field_boundaries_sf <- sf::st_as_sf(field_boundaries_sf)
|
|
}
|
|
|
|
safe_log(paste(" Processing", length(tile_files), "tiles for date", date, "(3-tile parallel batch)"))
|
|
|
|
# Windows-compatible parallelization: Process tiles in small batches
|
|
# Use future_map with 3 workers - stable and efficient on Windows
|
|
|
|
# Set up minimal future plan (3 workers max)
|
|
future::plan(future::multisession, workers = 3)
|
|
|
|
# Process tiles using furrr with 2 workers
|
|
# Use retry logic for worker stability
|
|
stats_list <- tryCatch({
|
|
furrr::future_map(
|
|
tile_files,
|
|
~ process_single_tile(.x, field_boundaries_sf, date, merged_final_tif_dir, grid_size = grid_size),
|
|
.progress = FALSE,
|
|
.options = furrr::furrr_options(seed = TRUE)
|
|
)
|
|
}, error = function(e) {
|
|
safe_log(paste("Parallel processing failed:", e$message, "- falling back to sequential"), "WARNING")
|
|
# Fallback to sequential if parallel fails
|
|
lapply(
|
|
tile_files,
|
|
function(tile_file) {
|
|
process_single_tile(tile_file, field_boundaries_sf, date, merged_final_tif_dir, grid_size = grid_size)
|
|
}
|
|
)
|
|
})
|
|
|
|
# Extract names and filter out NULL results (failed tiles)
|
|
tile_names <- basename(tile_files)
|
|
all_stats <- stats_list[!sapply(stats_list, is.null)]
|
|
names(all_stats) <- tile_names[!sapply(stats_list, is.null)]
|
|
|
|
if (length(all_stats) == 0) {
|
|
return(NULL)
|
|
}
|
|
|
|
# Combine all tiles and aggregate to field level
|
|
# Use dplyr::bind_rows() to handle column name inconsistencies gracefully
|
|
combined_tiles <- dplyr::bind_rows(all_stats)
|
|
rownames(combined_tiles) <- NULL
|
|
|
|
# Aggregate: For each field, compute mean CI across all tiles
|
|
aggregated <- combined_tiles %>%
|
|
group_by(across(-c(mean_CI, tile_file))) %>%
|
|
summarise(!!date := round(mean(mean_CI, na.rm = TRUE), 2), .groups = "drop")
|
|
|
|
# Save individual date file (matching legacy workflow format: extracted_YYYY-MM-DD_quadrant.rds)
|
|
if (!is.null(daily_CI_vals_dir)) {
|
|
save_path <- file.path(daily_CI_vals_dir, paste0("extracted_", date, "_quadrant.rds"))
|
|
saveRDS(aggregated, save_path)
|
|
safe_log(paste("[RDS SAVED] Date:", date, "-> File: extracted_", date, "_quadrant.rds"))
|
|
}
|
|
|
|
return(aggregated)
|
|
}
|