SmartCane/python_app/harvest_date_pred_utils.py
Timon 458b8247be Cleanup: Fix CI formula, reorganize shell scripts and test files
- Fixed CI calculation: changed from NDVI (NIR-Red)/(NIR+Red) to correct NIR/Green-1 formula in:
  * process_single_tile() function
  * create_ci_band() utility function
  * Updated create_mask_and_crop() documentation

- Renamed numbered shell scripts for clarity (matching R script numbering):
  * 01_run_planet_download -> 10_planet_download.sh
  * 02_run_ci_extraction -> 20_ci_extraction.sh
  * 03_run_growth_model -> 30_growth_model.sh
  * 04_run_mosaic_creation -> 40_mosaic_creation.sh
  * 09_run_calculate_kpis -> 80_calculate_kpis.sh
  * 10_run_kpi_report -> 90_kpi_report.sh

- Archived obsolete shell scripts to old_sh/:
  * build_mosaic.sh, build_report.sh, interpolate_growth_model.sh
  * 05_run_dashboard_report.sh, 06_run_crop_messaging.sh
  * 11_run_yield_prediction.sh/ps1
  * runcane.sh, runpython.sh, smartcane.sh, update_RDS.sh

- Deleted test/debug files and temporary outputs:
  * analyze_*.R, benchmark_gpu_vs_cpu.py, convert_angata_harvest.py
  * debug_mosaic.R, examine_kpi_results.R, generate_sar_report.R
  * inspect_8band_structure.R, inspect_tif_bands.R
  * old_working_utils.R, predict_harvest_operational.R
  * run_kpi_calculation.R, run_report.R, simple_sar_test.R
  * data_validation_tool/, harvest_ci_pattern_analysis.png, kpi_debug.out

- Enhanced harvest prediction: Added threshold tuning (0.40-0.45) and field type handling

- Enhanced mosaic creation: Improved tile detection and routing logic
2026-01-14 16:58:51 +01:00

496 lines
19 KiB
Python

"""
Self-contained utility module for two-step harvest date prediction and Excel export.
Includes model architecture, feature engineering, and core prediction logic.
"""
import sys
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import pickle
import yaml
from pathlib import Path
from typing import Tuple, Dict, Any, List
from sklearn.preprocessing import StandardScaler
# ============================================================================
# TORCH MODELS (from src/models.py, inlined for self-containment)
# ============================================================================
class HarvestDetectionLSTM(nn.Module):
"""Unidirectional LSTM for harvest detection with dual outputs."""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 1, dropout: float = 0.5):
super(HarvestDetectionLSTM, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False,
batch_first=True
)
self.imminent_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
self.detected_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
lstm_out, _ = self.lstm(x)
batch_size, seq_len, hidden_size = lstm_out.shape
lstm_flat = lstm_out.reshape(-1, hidden_size)
imminent_flat = self.imminent_head(lstm_flat).reshape(batch_size, seq_len)
detected_flat = self.detected_head(lstm_flat).reshape(batch_size, seq_len)
return imminent_flat, detected_flat
class HarvestDetectionGRU(nn.Module):
"""Unidirectional GRU for harvest detection with dual outputs."""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 1, dropout: float = 0.5):
super(HarvestDetectionGRU, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.gru = nn.GRU(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False,
batch_first=True
)
self.imminent_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
self.detected_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
gru_out, _ = self.gru(x)
batch_size, seq_len, hidden_size = gru_out.shape
gru_flat = gru_out.reshape(-1, hidden_size)
imminent_flat = self.imminent_head(gru_flat).reshape(batch_size, seq_len)
detected_flat = self.detected_head(gru_flat).reshape(batch_size, seq_len)
return imminent_flat, detected_flat
def create_model(model_type: str, input_size: int, hidden_size: int = 128,
num_layers: int = 1, dropout: float = 0.5, device = None) -> nn.Module:
"""Create a model from registry."""
registry = {'LSTM': HarvestDetectionLSTM, 'GRU': HarvestDetectionGRU}
if model_type not in registry:
raise ValueError(f"Unknown model type: {model_type}")
model = registry[model_type](
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout
)
if device:
model = model.to(device)
# Print model info
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Model: {model_type}")
print(f" Input size: {input_size}")
print(f" Hidden size: {hidden_size}")
print(f" Num layers: {num_layers}")
print(f" Dropout: {dropout}")
print(f" Total parameters: {total_params:,}")
print(f" Trainable parameters: {trainable_params:,}")
print(f" Device: {device}")
return model
# ============================================================================
# FEATURE ENGINEERING (from src/feature_engineering.py, simplified for inline)
# ============================================================================
def compute_ci_features(ci_series: pd.Series, doy_series: pd.Series = None) -> pd.DataFrame:
"""Compute all CI-based features (state, velocity, acceleration, min/max/range/std/CV)."""
features = pd.DataFrame(index=ci_series.index)
# State (moving averages)
features['CI_raw'] = ci_series
features['7d_MA'] = ci_series.rolling(window=7, min_periods=1).mean()
features['14d_MA'] = ci_series.rolling(window=14, min_periods=1).mean()
features['21d_MA'] = ci_series.rolling(window=21, min_periods=1).mean()
# Velocity (gradient of MA)
for window in [7, 14, 21]:
ma = ci_series.rolling(window=window, min_periods=1).mean()
features[f'{window}d_velocity'] = ma.diff() / 1.0 # Simplified gradient
# Acceleration (gradient of velocity)
for window in [7, 14, 21]:
ma = ci_series.rolling(window=window, min_periods=1).mean()
vel = ma.diff()
features[f'{window}d_acceleration'] = vel.diff()
# Min, Max, Range
for window in [7, 14, 21]:
features[f'{window}d_min'] = ci_series.rolling(window=window, min_periods=1).min()
features[f'{window}d_max'] = ci_series.rolling(window=window, min_periods=1).max()
features[f'{window}d_range'] = features[f'{window}d_max'] - features[f'{window}d_min']
# Std and CV
for window in [7, 14, 21]:
features[f'{window}d_std'] = ci_series.rolling(window=window, min_periods=1).std()
ma = ci_series.rolling(window=window, min_periods=1).mean()
features[f'{window}d_CV'] = features[f'{window}d_std'] / (ma + 1e-6)
# DOY normalized
if doy_series is not None:
features['DOY_normalized'] = doy_series / 450.0
return features.fillna(0)
def extract_features(data_df: pd.DataFrame, feature_names: List[str], ci_column: str = 'FitData') -> np.ndarray:
"""Extract and return specified features as numpy array."""
# Compute all CI features
ci_series = data_df[ci_column].astype(float)
doy_series = pd.Series(range(len(data_df)), index=data_df.index) % 365 if 'DOY_normalized' in feature_names else None
all_features = compute_ci_features(ci_series, doy_series)
# Select requested features
requested = [f for f in feature_names if f in all_features.columns]
if not requested:
raise ValueError(f"No valid features found. Requested: {feature_names}")
return all_features[requested].values
# ============================================================================
# MAIN UTILITY FUNCTIONS
# ============================================================================
def load_model_and_config(model_dir: Path):
"""Load model, config, and scalers from a given directory."""
cwd = Path.cwd()
# Try different naming conventions
candidates = [
# Standard names
(model_dir / "config.json", model_dir / "model.pt", model_dir / "scalers.pkl"),
# Model 307 specific names
(model_dir / "model_config.json", model_dir / "model_307.pt", model_dir / "model_scalers.pkl"),
# CWD standard names
(cwd / "config.json", cwd / "model.pt", cwd / "scalers.pkl"),
# CWD Model 307 names
(cwd / "model_config.json", cwd / "model_307.pt", cwd / "model_scalers.pkl"),
]
config_file = model_file = scalers_file = None
for cfg, mdl, scl in candidates:
if cfg.exists() and mdl.exists() and scl.exists():
config_file, model_file, scalers_file = cfg, mdl, scl
print(f"Found model files in: {cfg.parent}")
break
if not (config_file and model_file and scalers_file):
missing = []
for cfg, mdl, scl in candidates:
if not cfg.exists():
missing.append(str(cfg))
if not mdl.exists():
missing.append(str(mdl))
if not scl.exists():
missing.append(str(scl))
raise FileNotFoundError(
f"Missing model files. Checked multiple locations. Missing: {missing}"
)
with open(config_file) as f:
config = yaml.safe_load(f)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = create_model(
model_type=config['model']['type'],
input_size=len(config['features']),
hidden_size=config['model']['hidden_size'],
num_layers=config['model']['num_layers'],
dropout=config['model']['dropout'],
device=device
)
print(f"Loading weights from: {model_file}")
model.load_state_dict(torch.load(model_file, map_location=device, weights_only=False))
model.eval()
with open(scalers_file, 'rb') as f:
scalers = pickle.load(f)
return model, config, scalers
def load_harvest_data(data_file: Path) -> pd.DataFrame:
"""Load harvest data CSV."""
print(f"Loading data from: {data_file}")
df = pd.read_csv(data_file)
print(f"Loaded {len(df)} rows")
return df
def run_phase1_growing_window(field_data, model, config, scalers, ci_column, device,
threshold=0.45, consecutive_days=2):
"""
Phase 1: Growing window detection with threshold crossing.
Expand window day-by-day, check last timestep's detected_prob.
When N consecutive days have prob > threshold, harvest detected.
Args:
threshold (float): Probability threshold (default 0.45, tuned for Model 307)
consecutive_days (int): Required consecutive days above threshold (default 2, reduced from 3 for robustness)
Returns list of (harvest_date, harvest_idx) tuples.
"""
harvest_dates = []
current_pos = 0
while current_pos < len(field_data):
consecutive_above_threshold = 0
for window_end in range(current_pos + 1, len(field_data) + 1):
window_data = field_data.iloc[current_pos:window_end].copy().reset_index(drop=True)
try:
features = extract_features(window_data, config['features'], ci_column=ci_column)
# Apply scalers
for fi, scaler in enumerate(scalers):
try:
features[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten()
except Exception:
pass
# Run model
with torch.no_grad():
x_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device)
imminent_probs, detected_probs = model(x_tensor)
detected_probs = detected_probs.squeeze(0).cpu().numpy()
# Check LAST timestep
last_prob = detected_probs[-1]
if last_prob > threshold:
consecutive_above_threshold += 1
else:
consecutive_above_threshold = 0
# Harvest detected: N consecutive days above threshold
if consecutive_above_threshold >= consecutive_days:
harvest_date = field_data.iloc[current_pos + window_end - consecutive_days]['Date']
harvest_dates.append((harvest_date, current_pos + window_end - consecutive_days))
# Reset to next day after harvest
current_pos = current_pos + window_end - consecutive_days + 1
break
except Exception:
continue
else:
break
return harvest_dates
def run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device):
"""
Phase 2: Refinement with ±40 day window.
For each Phase 1 harvest, extract window and refine with argmax.
Returns list of (harvest_date, harvest_idx) tuples.
"""
refined_harvests = []
field_data = field_data.sort_values('Date').reset_index(drop=True)
for i, (phase1_harvest_date, phase1_idx) in enumerate(phase1_harvests):
try:
# Determine season start
if i == 0:
season_start_date = field_data.iloc[0]['Date']
else:
prev_harvest_idx = phase1_harvests[i-1][1]
season_start_idx = prev_harvest_idx + 1
if season_start_idx >= len(field_data):
break
season_start_date = field_data.iloc[season_start_idx]['Date']
# Extract ±40 day window
window_start_date = season_start_date - pd.Timedelta(days=40)
window_end_date = phase1_harvest_date + pd.Timedelta(days=40)
window_start_idx = max(0, (field_data['Date'] >= window_start_date).idxmax() if (field_data['Date'] >= window_start_date).any() else 0)
window_end_idx = min(len(field_data), (field_data['Date'] <= window_end_date).idxmax() + 1 if (field_data['Date'] <= window_end_date).any() else len(field_data))
if window_end_idx <= window_start_idx:
refined_harvests.append((phase1_harvest_date, phase1_idx))
continue
window_data = field_data.iloc[window_start_idx:window_end_idx].copy().reset_index(drop=True)
# Extract features for full window
features = extract_features(window_data, config['features'], ci_column=ci_column)
# Apply scalers
for fi, scaler in enumerate(scalers):
try:
features[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten()
except Exception:
pass
# Run model once on full window
with torch.no_grad():
x_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device)
imminent_probs, detected_probs = model(x_tensor)
detected_probs = detected_probs.squeeze(0).cpu().numpy()
# Find refined harvest (argmax in window)
refined_idx_in_window = int(np.argmax(detected_probs))
refined_idx_global = window_start_idx + refined_idx_in_window
refined_harvest_date = field_data.iloc[refined_idx_global]['Date']
refined_harvests.append((refined_harvest_date, refined_idx_global))
except Exception:
refined_harvests.append((phase1_harvest_date, phase1_idx))
return refined_harvests
def run_two_step_refinement(df: pd.DataFrame, model, config, scalers, device=None,
phase1_threshold=0.45, phase1_consecutive=2):
"""
Two-step harvest detection for each field:
1. Phase 1: Growing window with threshold confirmation
2. Phase 2: ±40 day refinement with argmax
Args:
phase1_threshold (float): Probability threshold for Phase 1 (default 0.45, tuned for Model 307)
phase1_consecutive (int): Consecutive days required (default 2, reduced from 3 for robustness)
Returns list of dicts with field, season_start_date, season_end_date, etc.
"""
if device is None:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results = []
ci_column = config['data']['ci_column']
# Group by field and count total fields for progress
field_groups = list(df.groupby('field'))
total_fields = len(field_groups)
harvests_found = 0
print(f" Processing {total_fields} fields...")
print(f" Phase 1 parameters: threshold={phase1_threshold}, consecutive_days={phase1_consecutive}")
for idx, (field, field_data) in enumerate(field_groups, 1):
# Simple progress indicator
pct = int((idx / total_fields) * 100)
bar_length = 40
filled = int((idx / total_fields) * bar_length)
bar = "" * filled + "" * (bar_length - filled)
print(f" [{bar}] {pct:3d}% ({idx}/{total_fields} fields)", end='\r')
field_data = field_data.sort_values('Date').reset_index(drop=True)
# Phase 1: Growing window detection
phase1_harvests = run_phase1_growing_window(field_data, model, config, scalers, ci_column, device,
threshold=phase1_threshold, consecutive_days=phase1_consecutive)
if not phase1_harvests:
continue
# Phase 2: Refinement
phase2_harvests = run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device)
# Store results
for i, (harvest_date, harvest_idx) in enumerate(phase2_harvests):
if i == 0:
season_start_date = field_data.iloc[0]['Date']
else:
prev_harvest_idx = phase2_harvests[i-1][1]
season_start_idx = prev_harvest_idx + 1
if season_start_idx >= len(field_data):
break
season_start_date = field_data.iloc[season_start_idx]['Date']
season_end_date = harvest_date
result = {
'field': field,
'season': i + 1,
'season_start_date': season_start_date,
'season_end_date': season_end_date,
'phase2_harvest_date': harvest_date,
}
results.append(result)
harvests_found += 1
print() # New line after progress bar
print(f" ✓ Complete: Found {harvests_found} harvest events across {total_fields} fields")
return results
def build_production_harvest_table(refined_results: List[Dict]) -> pd.DataFrame:
"""
Build a DataFrame from refined results with columns for production pipeline.
One row per field/season with season start and end dates (formatted as YYYY-MM-DD).
"""
if not refined_results:
print("WARNING: No refined results to build table")
return pd.DataFrame(columns=['field', 'season', 'season_start_date', 'season_end_date'])
# Build DataFrame
df = pd.DataFrame(refined_results)
# Ensure date columns are datetime
df['season_start_date'] = pd.to_datetime(df['season_start_date']).dt.strftime('%Y-%m-%d')
df['season_end_date'] = pd.to_datetime(df['season_end_date']).dt.strftime('%Y-%m-%d')
df['phase2_harvest_date'] = pd.to_datetime(df['phase2_harvest_date']).dt.strftime('%Y-%m-%d')
print(f"Built production table with {len(df)} field/season combinations")
return df