""" Self-contained utility module for two-step harvest date prediction and Excel export. Includes model architecture, feature engineering, and core prediction logic. """ import sys import pandas as pd import numpy as np import torch import torch.nn as nn import pickle import yaml from pathlib import Path from typing import Tuple, Dict, Any, List from sklearn.preprocessing import StandardScaler # ============================================================================ # TORCH MODELS (from src/models.py, inlined for self-containment) # ============================================================================ class HarvestDetectionLSTM(nn.Module): """Unidirectional LSTM for harvest detection with dual outputs.""" def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 1, dropout: float = 0.5): super(HarvestDetectionLSTM, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=False, batch_first=True ) self.imminent_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) self.detected_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: lstm_out, _ = self.lstm(x) batch_size, seq_len, hidden_size = lstm_out.shape lstm_flat = lstm_out.reshape(-1, hidden_size) imminent_flat = self.imminent_head(lstm_flat).reshape(batch_size, seq_len) detected_flat = self.detected_head(lstm_flat).reshape(batch_size, seq_len) return imminent_flat, detected_flat class HarvestDetectionGRU(nn.Module): """Unidirectional GRU for harvest detection with dual outputs.""" def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 1, dropout: float = 0.5): super(HarvestDetectionGRU, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.gru = nn.GRU( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=False, batch_first=True ) self.imminent_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) self.detected_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: gru_out, _ = self.gru(x) batch_size, seq_len, hidden_size = gru_out.shape gru_flat = gru_out.reshape(-1, hidden_size) imminent_flat = self.imminent_head(gru_flat).reshape(batch_size, seq_len) detected_flat = self.detected_head(gru_flat).reshape(batch_size, seq_len) return imminent_flat, detected_flat def create_model(model_type: str, input_size: int, hidden_size: int = 128, num_layers: int = 1, dropout: float = 0.5, device = None) -> nn.Module: """Create a model from registry.""" registry = {'LSTM': HarvestDetectionLSTM, 'GRU': HarvestDetectionGRU} if model_type not in registry: raise ValueError(f"Unknown model type: {model_type}") model = registry[model_type]( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout ) if device: model = model.to(device) # Print model info total_params = sum(p.numel() for p in model.parameters()) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f"Model: {model_type}") print(f" Input size: {input_size}") print(f" Hidden size: {hidden_size}") print(f" Num layers: {num_layers}") print(f" Dropout: {dropout}") print(f" Total parameters: {total_params:,}") print(f" Trainable parameters: {trainable_params:,}") print(f" Device: {device}") return model # ============================================================================ # FEATURE ENGINEERING (from src/feature_engineering.py, simplified for inline) # ============================================================================ def compute_ci_features(ci_series: pd.Series, doy_series: pd.Series = None) -> pd.DataFrame: """Compute all CI-based features (state, velocity, acceleration, min/max/range/std/CV).""" features = pd.DataFrame(index=ci_series.index) # State (moving averages) features['CI_raw'] = ci_series features['7d_MA'] = ci_series.rolling(window=7, min_periods=1).mean() features['14d_MA'] = ci_series.rolling(window=14, min_periods=1).mean() features['21d_MA'] = ci_series.rolling(window=21, min_periods=1).mean() # Velocity (gradient of MA) for window in [7, 14, 21]: ma = ci_series.rolling(window=window, min_periods=1).mean() features[f'{window}d_velocity'] = ma.diff() / 1.0 # Simplified gradient # Acceleration (gradient of velocity) for window in [7, 14, 21]: ma = ci_series.rolling(window=window, min_periods=1).mean() vel = ma.diff() features[f'{window}d_acceleration'] = vel.diff() # Min, Max, Range for window in [7, 14, 21]: features[f'{window}d_min'] = ci_series.rolling(window=window, min_periods=1).min() features[f'{window}d_max'] = ci_series.rolling(window=window, min_periods=1).max() features[f'{window}d_range'] = features[f'{window}d_max'] - features[f'{window}d_min'] # Std and CV for window in [7, 14, 21]: features[f'{window}d_std'] = ci_series.rolling(window=window, min_periods=1).std() ma = ci_series.rolling(window=window, min_periods=1).mean() features[f'{window}d_CV'] = features[f'{window}d_std'] / (ma + 1e-6) # DOY normalized if doy_series is not None: features['DOY_normalized'] = doy_series / 450.0 return features.fillna(0) def extract_features(data_df: pd.DataFrame, feature_names: List[str], ci_column: str = 'FitData') -> np.ndarray: """Extract and return specified features as numpy array.""" # Compute all CI features ci_series = data_df[ci_column].astype(float) doy_series = pd.Series(range(len(data_df)), index=data_df.index) % 365 if 'DOY_normalized' in feature_names else None all_features = compute_ci_features(ci_series, doy_series) # Select requested features requested = [f for f in feature_names if f in all_features.columns] if not requested: raise ValueError(f"No valid features found. Requested: {feature_names}") return all_features[requested].values # ============================================================================ # MAIN UTILITY FUNCTIONS # ============================================================================ def load_model_and_config(model_dir: Path): """Load model, config, and scalers from a given directory.""" cwd = Path.cwd() # Try different naming conventions candidates = [ # Standard names (model_dir / "config.json", model_dir / "model.pt", model_dir / "scalers.pkl"), # Model 307 specific names (model_dir / "model_config.json", model_dir / "model_307.pt", model_dir / "model_scalers.pkl"), # CWD standard names (cwd / "config.json", cwd / "model.pt", cwd / "scalers.pkl"), # CWD Model 307 names (cwd / "model_config.json", cwd / "model_307.pt", cwd / "model_scalers.pkl"), ] config_file = model_file = scalers_file = None for cfg, mdl, scl in candidates: if cfg.exists() and mdl.exists() and scl.exists(): config_file, model_file, scalers_file = cfg, mdl, scl print(f"Found model files in: {cfg.parent}") break if not (config_file and model_file and scalers_file): missing = [] for cfg, mdl, scl in candidates: if not cfg.exists(): missing.append(str(cfg)) if not mdl.exists(): missing.append(str(mdl)) if not scl.exists(): missing.append(str(scl)) raise FileNotFoundError( f"Missing model files. Checked multiple locations. Missing: {missing}" ) with open(config_file) as f: config = yaml.safe_load(f) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = create_model( model_type=config['model']['type'], input_size=len(config['features']), hidden_size=config['model']['hidden_size'], num_layers=config['model']['num_layers'], dropout=config['model']['dropout'], device=device ) print(f"Loading weights from: {model_file}") model.load_state_dict(torch.load(model_file, map_location=device, weights_only=False)) model.eval() with open(scalers_file, 'rb') as f: scalers = pickle.load(f) return model, config, scalers def load_harvest_data(data_file: Path) -> pd.DataFrame: """Load harvest data CSV.""" print(f"Loading data from: {data_file}") df = pd.read_csv(data_file) print(f"Loaded {len(df)} rows") return df def run_phase1_growing_window(field_data, model, config, scalers, ci_column, device): """ Phase 1: Growing window detection with threshold crossing. Expand window day-by-day, check last timestep's detected_prob. When 3 consecutive days have prob > 0.5, harvest detected. Returns list of (harvest_date, harvest_idx) tuples. """ harvest_dates = [] current_pos = 0 while current_pos < len(field_data): consecutive_above_threshold = 0 for window_end in range(current_pos + 1, len(field_data) + 1): window_data = field_data.iloc[current_pos:window_end].copy().reset_index(drop=True) try: features = extract_features(window_data, config['features'], ci_column=ci_column) # Apply scalers for fi, scaler in enumerate(scalers): try: features[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten() except Exception: pass # Run model with torch.no_grad(): x_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device) imminent_probs, detected_probs = model(x_tensor) detected_probs = detected_probs.squeeze(0).cpu().numpy() # Check LAST timestep last_prob = detected_probs[-1] if last_prob > 0.5: consecutive_above_threshold += 1 else: consecutive_above_threshold = 0 # Harvest detected: 3 consecutive days above threshold if consecutive_above_threshold >= 3: harvest_date = field_data.iloc[current_pos + window_end - 3]['Date'] harvest_dates.append((harvest_date, current_pos + window_end - 3)) # Reset to next day after harvest current_pos = current_pos + window_end - 2 break except Exception: continue else: break return harvest_dates def run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device): """ Phase 2: Refinement with ±40 day window. For each Phase 1 harvest, extract window and refine with argmax. Returns list of (harvest_date, harvest_idx) tuples. """ refined_harvests = [] field_data = field_data.sort_values('Date').reset_index(drop=True) for i, (phase1_harvest_date, phase1_idx) in enumerate(phase1_harvests): try: # Determine season start if i == 0: season_start_date = field_data.iloc[0]['Date'] else: prev_harvest_idx = phase1_harvests[i-1][1] season_start_idx = prev_harvest_idx + 1 if season_start_idx >= len(field_data): break season_start_date = field_data.iloc[season_start_idx]['Date'] # Extract ±40 day window window_start_date = season_start_date - pd.Timedelta(days=40) window_end_date = phase1_harvest_date + pd.Timedelta(days=40) window_start_idx = max(0, (field_data['Date'] >= window_start_date).idxmax() if (field_data['Date'] >= window_start_date).any() else 0) window_end_idx = min(len(field_data), (field_data['Date'] <= window_end_date).idxmax() + 1 if (field_data['Date'] <= window_end_date).any() else len(field_data)) if window_end_idx <= window_start_idx: refined_harvests.append((phase1_harvest_date, phase1_idx)) continue window_data = field_data.iloc[window_start_idx:window_end_idx].copy().reset_index(drop=True) # Extract features for full window features = extract_features(window_data, config['features'], ci_column=ci_column) # Apply scalers for fi, scaler in enumerate(scalers): try: features[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten() except Exception: pass # Run model once on full window with torch.no_grad(): x_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device) imminent_probs, detected_probs = model(x_tensor) detected_probs = detected_probs.squeeze(0).cpu().numpy() # Find refined harvest (argmax in window) refined_idx_in_window = int(np.argmax(detected_probs)) refined_idx_global = window_start_idx + refined_idx_in_window refined_harvest_date = field_data.iloc[refined_idx_global]['Date'] refined_harvests.append((refined_harvest_date, refined_idx_global)) except Exception: refined_harvests.append((phase1_harvest_date, phase1_idx)) return refined_harvests def run_two_step_refinement(df: pd.DataFrame, model, config, scalers, device=None): """ Two-step harvest detection for each field: 1. Phase 1: Growing window with 3-day threshold confirmation 2. Phase 2: ±40 day refinement with argmax Returns list of dicts with field, season_start_date, season_end_date, etc. """ if device is None: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") results = [] ci_column = config['data']['ci_column'] # Group by field and count total fields for progress field_groups = list(df.groupby('field')) total_fields = len(field_groups) harvests_found = 0 print(f" Processing {total_fields} fields...") for idx, (field, field_data) in enumerate(field_groups, 1): # Simple progress indicator pct = int((idx / total_fields) * 100) bar_length = 40 filled = int((idx / total_fields) * bar_length) bar = "█" * filled + "░" * (bar_length - filled) print(f" [{bar}] {pct:3d}% ({idx}/{total_fields} fields)", end='\r') field_data = field_data.sort_values('Date').reset_index(drop=True) # Phase 1: Growing window detection phase1_harvests = run_phase1_growing_window(field_data, model, config, scalers, ci_column, device) if not phase1_harvests: continue # Phase 2: Refinement phase2_harvests = run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device) # Store results for i, (harvest_date, harvest_idx) in enumerate(phase2_harvests): if i == 0: season_start_date = field_data.iloc[0]['Date'] else: prev_harvest_idx = phase2_harvests[i-1][1] season_start_idx = prev_harvest_idx + 1 if season_start_idx >= len(field_data): break season_start_date = field_data.iloc[season_start_idx]['Date'] season_end_date = harvest_date result = { 'field': field, 'season': i + 1, 'season_start_date': season_start_date, 'season_end_date': season_end_date, 'phase2_harvest_date': harvest_date, } results.append(result) harvests_found += 1 print() # New line after progress bar print(f" ✓ Complete: Found {harvests_found} harvest events across {total_fields} fields") return results def build_production_harvest_table(refined_results: List[Dict]) -> pd.DataFrame: """ Build a DataFrame from refined results with columns for production pipeline. One row per field/season with season start and end dates (formatted as YYYY-MM-DD). """ if not refined_results: print("WARNING: No refined results to build table") return pd.DataFrame(columns=['field', 'season', 'season_start_date', 'season_end_date']) # Build DataFrame df = pd.DataFrame(refined_results) # Ensure date columns are datetime df['season_start_date'] = pd.to_datetime(df['season_start_date']).dt.strftime('%Y-%m-%d') df['season_end_date'] = pd.to_datetime(df['season_end_date']).dt.strftime('%Y-%m-%d') df['phase1_harvest_date'] = pd.to_datetime(df['phase1_harvest_date']).dt.strftime('%Y-%m-%d') print(f"Built production table with {len(df)} field/season combinations") return df