SmartCane/r_app/30_growth_model_utils.R

397 lines
15 KiB
R

# filepath: c:\Users\timon\Resilience BV\4020 SCane ESA DEMO - Documenten\General\4020 SCDEMO Team\4020 TechnicalData\WP3\smartcane\r_app\growth_model_utils.R
#
# GROWTH_MODEL_UTILS.R
# ===================
# Utility functions for growth model interpolation and manipulation.
# These functions support the creation of continuous growth models from point measurements.
#
# PERFORMANCE OPTIMIZATION:
# - Parallel file I/O: Reads 450k+ RDS files using furrr::future_map_dfr()
# - Parallel field interpolation: Processes fields in parallel (1 core per ~100 fields)
# - Dynamic CPU detection: Allocates workers based on available cores
# - Windows compatible: Uses furrr with plan(multisession) for cross-platform support
#' Load and prepare the combined CI data (Per-Field Architecture)
#' OPTIMIZE: Filters by date during load (skip unnecessary date ranges)
#' PARALLELIZE: Reads 450k+ RDS files in parallel using furrr::future_map_dfr()
#'
#' @param daily_vals_dir Directory containing per-field daily RDS files (Data/extracted_ci/daily_vals)
#' @param harvesting_data Optional: Dataframe with season dates. If provided, only loads files within season ranges (major speedup)
#' @return Long-format dataframe with CI values by date and field
#'
load_combined_ci_data <- function(daily_vals_dir, harvesting_data = NULL) {
# For per-field architecture: daily_vals_dir = Data/extracted_ci/daily_vals
# Structure: daily_vals/{FIELD_NAME}/{YYYY-MM-DD}.rds
if (!dir.exists(daily_vals_dir)) {
stop(paste("Daily values directory not found:", daily_vals_dir))
}
safe_log(paste("Loading per-field CI data from:", daily_vals_dir))
# OPTIMIZATION: If harvest data provided, extract date range to avoid loading unnecessary dates
date_filter_min <- NULL
date_filter_max <- NULL
if (!is.null(harvesting_data) && nrow(harvesting_data) > 0) {
date_filter_min <- min(harvesting_data$season_start, na.rm = TRUE)
date_filter_max <- max(harvesting_data$season_end, na.rm = TRUE)
safe_log(sprintf("Pre-filtering by harvest season dates: %s to %s",
format(date_filter_min, "%Y-%m-%d"),
format(date_filter_max, "%Y-%m-%d")))
}
# Find all daily RDS files recursively (per-field structure)
# IMPORTANT: Only load files matching the per-field format YYYY-MM-DD.rds in field subdirectories
all_daily_files <- list.files(
path = daily_vals_dir,
pattern = "^\\d{4}-\\d{2}-\\d{2}\\.rds$", # Only YYYY-MM-DD.rds format
full.names = TRUE,
recursive = TRUE
)
# Further filter: only keep files that are in a subdirectory (per-field structure)
# Exclude legacy files at the root level like "extracted_2024-02-29_whole_field.rds"
all_daily_files <- all_daily_files[basename(dirname(all_daily_files)) != "daily_vals"]
if (length(all_daily_files) == 0) {
stop(paste("No per-field daily RDS files found in:", daily_vals_dir))
}
safe_log(sprintf("Found %d per-field daily RDS files (filtered from legacy format)", length(all_daily_files)))
# OPTIMIZATION: Filter files by filename date BEFORE parallel loading
# Skip files outside harvest season (can save 60-80% of I/O on large datasets)
if (!is.null(date_filter_min) && !is.null(date_filter_max)) {
all_daily_files <- all_daily_files[
{
dates <- as.Date(tools::file_path_sans_ext(basename(all_daily_files)), format = "%Y-%m-%d")
!is.na(dates) & dates >= date_filter_min & dates <= date_filter_max
}
]
safe_log(sprintf("Filtered to %d files within harvest season date range", length(all_daily_files)))
}
# Adaptive core count: scale with file count to avoid parallel overhead on small projects
n_files <- length(all_daily_files)
n_cores_io <- if (n_files < 200) {
1
} else if (n_files < 600) {
2
} else if (n_files < 1500) {
min(parallel::detectCores() - 1, 4)
} else {
min(parallel::detectCores() - 1, 8)
}
safe_log(sprintf("Using %d parallel workers for file I/O (%d files)", n_cores_io, n_files))
read_one_file <- function(file) {
filename <- basename(file)
date_str <- tools::file_path_sans_ext(filename)
if (nchar(date_str) != 10 || !grepl("^\\d{4}-\\d{2}-\\d{2}$", date_str)) return(data.frame())
parsed_date <- as.Date(date_str, format = "%Y-%m-%d")
if (is.na(parsed_date)) return(data.frame())
tryCatch({
rds_data <- readRDS(file)
if (is.null(rds_data) || nrow(rds_data) == 0) return(data.frame())
rds_data %>% dplyr::mutate(Date = parsed_date)
}, error = function(e) data.frame())
}
if (n_cores_io > 1) {
future::plan(strategy = future::multisession, workers = n_cores_io)
combined_long <- furrr::future_map_dfr(
all_daily_files, read_one_file,
.progress = TRUE,
.options = furrr::furrr_options(seed = TRUE)
)
future::plan(future::sequential)
} else {
combined_long <- purrr::map_dfr(all_daily_files, read_one_file)
}
if (nrow(combined_long) == 0) {
safe_log("Warning: No valid CI data loaded from daily files", "WARNING")
return(data.frame())
}
# OPTIMIZATION: Use data.table for fast filtering (10-20x faster than dplyr on large datasets)
# Reshape to long format using ci_mean as the main CI value
DT <- data.table::as.data.table(combined_long)
DT <- DT[, .(field, sub_field, ci_mean, Date)]
DT[, c("value") := list(as.numeric(ci_mean))]
DT[, ci_mean := NULL]
# Fast filtering without .distinct() (which is slow on large datasets)
# Keep rows where Date is valid, field/sub_field exist, and value is finite
DT <- DT[!is.na(Date) & !is.na(sub_field) & !is.na(field) & is.finite(value)]
# Convert back to tibble for compatibility with rest of pipeline
pivot_stats_long <- dplyr::as_tibble(DT)
safe_log(sprintf("Loaded %d CI data points from %d daily files",
nrow(pivot_stats_long), length(all_daily_files)))
return(pivot_stats_long)
}
#' Extract and interpolate CI data for a specific field and season
#'
#' @param field_name Name of the field or sub-field
#' @param harvesting_data Dataframe with harvesting information
#' @param field_CI_data Dataframe with CI measurements
#' @param season Year of the growing season
#' @param verbose Logical: whether to log warnings/info (default TRUE). Set to FALSE during progress bar iteration.
#' @return Dataframe with interpolated daily CI values
#'
extract_CI_data <- function(field_name, harvesting_data, field_CI_data, season, verbose = TRUE) {
# Filter harvesting data for the given season and field name
filtered_harvesting_data <- harvesting_data %>%
dplyr::filter(year == season, sub_field == field_name)
if (nrow(filtered_harvesting_data) == 0) {
if (verbose) safe_log(paste("No harvesting data found for field:", field_name, "in season:", season), "WARNING")
return(data.frame())
}
# Filter field CI data for the given field name
filtered_field_CI_data <- field_CI_data %>%
dplyr::filter(sub_field == field_name)
# Return an empty data frame if no CI data is found
if (nrow(filtered_field_CI_data) == 0) {
if (verbose) safe_log(paste("No CI data found for field:", field_name, "in season:", season), "WARNING")
return(data.frame())
}
# Log season dates
season_start <- filtered_harvesting_data$season_start[1]
season_end <- filtered_harvesting_data$season_end[1]
ci_date_range <- paste(format(min(filtered_field_CI_data$Date), "%Y-%m-%d"),
"to",
format(max(filtered_field_CI_data$Date), "%Y-%m-%d"))
# Create a linear interpolation function for the CI data
tryCatch({
ApproxFun <- stats::approxfun(x = filtered_field_CI_data$Date, y = filtered_field_CI_data$value)
Dates <- seq.Date(min(filtered_field_CI_data$Date), max(filtered_field_CI_data$Date), by = 1)
LinearFit <- ApproxFun(Dates)
# Combine interpolated data with the original CI data
CI <- data.frame(Date = Dates, FitData = LinearFit) %>%
dplyr::left_join(filtered_field_CI_data, by = "Date") %>%
dplyr::filter(Date > filtered_harvesting_data$season_start & Date < filtered_harvesting_data$season_end)
# If CI is empty after filtering, return an empty dataframe
if (nrow(CI) == 0) {
if (verbose) {
safe_log(paste0("No CI data within season dates for field: ", field_name,
" (Season: ", season, ", dates: ",
format(season_start, "%Y-%m-%d"), " to ",
format(season_end, "%Y-%m-%d"),
"). Available CI data range: ", ci_date_range),
"WARNING")
}
return(data.frame())
}
# Add additional columns
CI <- CI %>%
dplyr::mutate(
DAH = seq(1, n(), 1),
model = paste0("Data", season, " : ", field_name),
season = season,
subField = field_name
)
# Return data with success status
return(CI)
}, error = function(e) {
# Return empty dataframe on error (will be tracked separately)
if (verbose) {
safe_log(paste0("Error interpolating CI data for field ", field_name,
" in season ", season,
" (", format(season_start, "%Y-%m-%d"), " to ",
format(season_end, "%Y-%m-%d"),
"): ", e$message), "ERROR")
}
return(data.frame())
})
}
#' Generate interpolated CI data for all fields and seasons
#' PARALLELIZE: Processes fields in parallel using furrr::future_map_df()
#'
#' @param years Vector of years to process
#' @param harvesting_data Dataframe with harvesting information
#' @param ci_data Long-format dataframe with CI measurements
#' @return Dataframe with interpolated daily CI values for all fields/seasons
#'
generate_interpolated_ci_data <- function(years, harvesting_data, ci_data) {
safe_log("Starting CI data interpolation for all fields")
# Track failed fields for end-of-run summary
failed_fields <- list()
total_fields <- 0
successful_fields <- 0
# Pre-compute total valid fields across all years to decide core count once
total_valid_fields <- sum(sapply(years, function(yr) {
sfs <- harvesting_data %>%
dplyr::filter(year == yr, !is.na(season_start)) %>%
dplyr::pull(sub_field)
sum(sfs %in% unique(ci_data$sub_field))
}))
# Adaptive core count: scale with field count, avoid parallel overhead for small projects
n_cores_interp <- if (total_valid_fields <= 1) {
1
} else if (total_valid_fields <= 10) {
2
} else if (total_valid_fields <= 50) {
min(parallel::detectCores() - 1, 4)
} else {
min(parallel::detectCores() - 1, 8)
}
safe_log(sprintf("Interpolating %d fields across %d year(s) using %d worker(s)",
total_valid_fields, length(years), n_cores_interp))
# Set up parallel plan once before the year loop (avoid per-year startup cost)
if (n_cores_interp > 1) {
future::plan(strategy = future::multisession, workers = n_cores_interp)
}
# Process each year
result <- purrr::map_df(years, function(yr) {
# Get the fields harvested in this year with valid season start dates
sub_fields <- harvesting_data %>%
dplyr::filter(year == yr, !is.na(season_start)) %>%
dplyr::pull(sub_field)
if (length(sub_fields) == 0) return(data.frame())
# Filter sub_fields to only include those with value data in ci_data
valid_sub_fields <- sub_fields %>%
purrr::keep(~ any(ci_data$sub_field == .x))
if (length(valid_sub_fields) == 0) return(data.frame())
total_fields <<- total_fields + length(valid_sub_fields)
safe_log(sprintf("Year %d: Processing %d fields", yr, length(valid_sub_fields)))
# Process fields — parallel if workers > 1, otherwise plain map (no overhead)
if (n_cores_interp > 1) {
result_list <- furrr::future_map(
valid_sub_fields,
.progress = TRUE,
.options = furrr::furrr_options(seed = TRUE),
function(field) {
extract_CI_data(field,
harvesting_data = harvesting_data,
field_CI_data = ci_data,
season = yr,
verbose = FALSE)
}
)
} else {
result_list <- purrr::map(valid_sub_fields, function(field) {
extract_CI_data(field,
harvesting_data = harvesting_data,
field_CI_data = ci_data,
season = yr,
verbose = TRUE)
})
}
# Process results and tracking
for (i in seq_along(result_list)) {
field_result <- result_list[[i]]
field_name <- valid_sub_fields[i]
if (nrow(field_result) > 0) {
successful_fields <<- successful_fields + 1
} else {
failed_fields[[length(failed_fields) + 1]] <<- list(
field = field_name,
season = yr,
reason = "Unable to generate interpolated data"
)
}
}
# Combine all results for this year
result_list <- result_list[sapply(result_list, nrow) > 0]
if (length(result_list) > 0) purrr::list_rbind(result_list) else data.frame()
})
# Tear down parallel plan once after all years are processed
if (n_cores_interp > 1) {
future::plan(future::sequential)
}
# Print summary
safe_log(sprintf("\n=== Interpolation Summary ==="))
safe_log(sprintf("Successfully interpolated: %d/%d fields", successful_fields, total_fields))
if (length(failed_fields) > 0) {
safe_log(sprintf("Failed to interpolate: %d fields", length(failed_fields)))
for (failure in failed_fields) {
safe_log(sprintf(" - Field %s (Season %d): %s",
failure$field, failure$season, failure$reason), "WARNING")
}
}
safe_log(sprintf("Total interpolated data points: %d", nrow(result)))
return(result)
}
#' Calculate growth metrics for interpolated CI data
#'
#' @param interpolated_data Dataframe with interpolated CI values
#' @return Dataframe with added growth metrics (CI_per_day and cumulative_CI)
#'
calculate_growth_metrics <- function(interpolated_data) {
if (nrow(interpolated_data) == 0) {
safe_log("No data provided to calculate growth metrics", "WARNING")
return(interpolated_data)
}
result <- interpolated_data %>%
dplyr::group_by(model) %>%
dplyr::mutate(
CI_per_day = FitData - dplyr::lag(FitData),
cumulative_CI = cumsum(FitData)
)
return(result)
}
#' Save interpolated growth model data
#'
#' @param data Dataframe with interpolated growth data
#' @param output_dir Directory to save the output
#' @param file_name Filename for the output (default: "All_pivots_Cumulative_CI_quadrant_year_v2.rds")
#' @return Path to the saved file
#'
save_growth_model <- function(data, output_dir, file_name = "All_pivots_Cumulative_CI_quadrant_year_v2.rds") {
# Validate input
if (is.null(output_dir) || !is.character(output_dir) || length(output_dir) == 0) {
stop("output_dir must be a non-empty character string")
}
# Normalize path separators for Windows compatibility
output_dir <- normalizePath(output_dir, winslash = "/", mustWork = FALSE)
# Create output directory if it doesn't exist
dir.create(output_dir, recursive = TRUE, showWarnings = FALSE)
# Create full file path using file.path (more robust than here::here for absolute paths)
file_path <- file.path(output_dir, file_name)
# Save the data
saveRDS(data, file_path)
safe_log(paste("Interpolated CI data saved to:", file_path))
return(file_path)
}