""" Self-contained utility module for two-step harvest date prediction and Excel export. Includes model architecture, feature engineering, and core prediction logic. """ import sys import pandas as pd import numpy as np import torch import torch.nn as nn import pickle import yaml from pathlib import Path from typing import Tuple, Dict, Any, List from sklearn.preprocessing import StandardScaler # ============================================================================ # TORCH MODELS (from src/models.py, inlined for self-containment) # ============================================================================ class HarvestDetectionLSTM(nn.Module): """Unidirectional LSTM for harvest detection with dual outputs.""" def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 1, dropout: float = 0.5): super(HarvestDetectionLSTM, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=False, batch_first=True ) self.imminent_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) self.detected_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: lstm_out, _ = self.lstm(x) batch_size, seq_len, hidden_size = lstm_out.shape lstm_flat = lstm_out.reshape(-1, hidden_size) imminent_flat = self.imminent_head(lstm_flat).reshape(batch_size, seq_len) detected_flat = self.detected_head(lstm_flat).reshape(batch_size, seq_len) return imminent_flat, detected_flat class HarvestDetectionGRU(nn.Module): """Unidirectional GRU for harvest detection with dual outputs.""" def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 1, dropout: float = 0.5): super(HarvestDetectionGRU, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.gru = nn.GRU( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout if num_layers > 1 else 0, bidirectional=False, batch_first=True ) self.imminent_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) self.detected_head = nn.Sequential( nn.Linear(hidden_size, 16), nn.ReLU(), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: gru_out, _ = self.gru(x) batch_size, seq_len, hidden_size = gru_out.shape gru_flat = gru_out.reshape(-1, hidden_size) imminent_flat = self.imminent_head(gru_flat).reshape(batch_size, seq_len) detected_flat = self.detected_head(gru_flat).reshape(batch_size, seq_len) return imminent_flat, detected_flat def create_model(model_type: str, input_size: int, hidden_size: int = 128, num_layers: int = 1, dropout: float = 0.5, device = None) -> nn.Module: """Create a model from registry.""" registry = {'LSTM': HarvestDetectionLSTM, 'GRU': HarvestDetectionGRU} if model_type not in registry: raise ValueError(f"Unknown model type: {model_type}") model = registry[model_type]( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout ) if device: model = model.to(device) # Print model info total_params = sum(p.numel() for p in model.parameters()) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f"Model: {model_type}") print(f" Input size: {input_size}") print(f" Hidden size: {hidden_size}") print(f" Num layers: {num_layers}") print(f" Dropout: {dropout}") print(f" Total parameters: {total_params:,}") print(f" Trainable parameters: {trainable_params:,}") print(f" Device: {device}") return model # ============================================================================ # FEATURE ENGINEERING (from src/feature_engineering.py, simplified for inline) # ============================================================================ def compute_ci_features(ci_series: pd.Series, dah_series: pd.Series = None) -> pd.DataFrame: """Compute all CI-based features (state, velocity, acceleration, min/max/range/std/CV).""" features = pd.DataFrame(index=ci_series.index) # State (moving averages) features['CI_raw'] = ci_series features['7d_MA'] = ci_series.rolling(window=7, min_periods=1).mean() features['14d_MA'] = ci_series.rolling(window=14, min_periods=1).mean() features['21d_MA'] = ci_series.rolling(window=21, min_periods=1).mean() # Velocity (gradient of MA) for window in [7, 14, 21]: ma = ci_series.rolling(window=window, min_periods=1).mean() features[f'{window}d_velocity'] = ma.diff() / 1.0 # Simplified gradient # Acceleration (gradient of velocity) for window in [7, 14, 21]: ma = ci_series.rolling(window=window, min_periods=1).mean() vel = ma.diff() features[f'{window}d_acceleration'] = vel.diff() # Min, Max, Range for window in [7, 14, 21]: features[f'{window}d_min'] = ci_series.rolling(window=window, min_periods=1).min() features[f'{window}d_max'] = ci_series.rolling(window=window, min_periods=1).max() features[f'{window}d_range'] = features[f'{window}d_max'] - features[f'{window}d_min'] # Std and CV for window in [7, 14, 21]: features[f'{window}d_std'] = ci_series.rolling(window=window, min_periods=1).std() ma = ci_series.rolling(window=window, min_periods=1).mean() features[f'{window}d_CV'] = features[f'{window}d_std'] / (ma + 1e-6) # DAH normalized (Days After Harvest) if dah_series is not None: features['DAH_normalized'] = dah_series / 450.0 return features.fillna(0) def extract_features(data_df: pd.DataFrame, feature_names: List[str], ci_column: str = 'FitData', season_anchor_day: int = None, lookback_start: int = 0) -> np.ndarray: """ Extract and return specified features as numpy array. Args: data_df: DataFrame with Date and CI data (may be a window after a harvest) feature_names: List of feature names to extract ci_column: Name of CI column season_anchor_day: Day in FULL sequence where this season started (for DAH reset) DAH will be recalculated as: 1, 2, 3, ... from this point lookback_start: Starting index in original full data (for season reset calculation) Returns: NumPy array of shape (len(data_df), len(feature_names)) """ # Compute all CI features ci_series = data_df[ci_column].astype(float) # Compute DAH (age/days since season start) - NOT day-of-year! # DAH is a continuous counter: 1, 2, 3, ..., 475 (doesn't cycle at 365) # It only resets to 1 after a harvest is detected (new season) dah_series = None if 'DAH_normalized' in feature_names: if season_anchor_day is not None and lookback_start >= season_anchor_day: # Season was reset after harvest. Recalculate DAH as simple counter from 1 # This is a window starting at or after harvest, so DAH should be: 1, 2, 3, ... dah_series = pd.Series(np.arange(1, len(data_df) + 1), index=data_df.index) elif 'DAH' in data_df.columns: # Use DAH directly from CSV - already calculated as continuous age counter dah_series = pd.Series(data_df['DAH'].astype(float).values, index=data_df.index) else: # Fallback: create continuous age counter (1, 2, 3, ...) dah_series = pd.Series(np.arange(1, len(data_df) + 1), index=data_df.index) all_features = compute_ci_features(ci_series, dah_series) # Select requested features requested = [f for f in feature_names if f in all_features.columns] if not requested: raise ValueError(f"No valid features found. Requested: {feature_names}, Available: {all_features.columns.tolist()}") return all_features[requested].values # ============================================================================ # MAIN UTILITY FUNCTIONS # ============================================================================ def load_model_and_config(model_dir: Path): """Load model, config, and scalers from a given directory.""" cwd = Path.cwd() # Try different naming conventions candidates = [ # Standard names (model_dir / "config.json", model_dir / "model.pt", model_dir / "scalers.pkl"), # Model 307 specific names (model_dir / "model_config.json", model_dir / "model_307.pt", model_dir / "model_scalers.pkl"), # CWD standard names (cwd / "config.json", cwd / "model.pt", cwd / "scalers.pkl"), # CWD Model 307 names (cwd / "model_config.json", cwd / "model_307.pt", cwd / "model_scalers.pkl"), ] config_file = model_file = scalers_file = None for cfg, mdl, scl in candidates: if cfg.exists() and mdl.exists() and scl.exists(): config_file, model_file, scalers_file = cfg, mdl, scl print(f"Found model files in: {cfg.parent}") break if not (config_file and model_file and scalers_file): missing = [] for cfg, mdl, scl in candidates: if not cfg.exists(): missing.append(str(cfg)) if not mdl.exists(): missing.append(str(mdl)) if not scl.exists(): missing.append(str(scl)) raise FileNotFoundError( f"Missing model files. Checked multiple locations. Missing: {missing}" ) with open(config_file) as f: config = yaml.safe_load(f) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = create_model( model_type=config['model']['type'], input_size=len(config['features']), hidden_size=config['model']['hidden_size'], num_layers=config['model']['num_layers'], dropout=config['model']['dropout'], device=device ) print(f"Loading weights from: {model_file}") model.load_state_dict(torch.load(model_file, map_location=device, weights_only=False)) model.eval() with open(scalers_file, 'rb') as f: scalers = pickle.load(f) return model, config, scalers def load_harvest_data(data_file: Path) -> pd.DataFrame: """Load harvest data CSV.""" print(f"Loading data from: {data_file}") df = pd.read_csv(data_file) print(f"Loaded {len(df)} rows") return df def run_phase1_growing_window(field_data, model, config, scalers, ci_column, device, threshold=0.45, consecutive_days=2): """ Phase 1: Growing window detection with DOY season reset. For each detected harvest, reset DOY counter for the next season. This allows the model to detect multiple consecutive harvests in multi-year data. """ harvest_dates = [] season_anchor_day = 0 current_pos = 0 while current_pos < len(field_data): consecutive_above_threshold = 0 min_window_size = 120 for window_end in range(current_pos + 1, len(field_data) + 1): window_data = field_data.iloc[current_pos:window_end].copy().reset_index(drop=True) if len(window_data) < min_window_size: continue try: reset_doy = current_pos > season_anchor_day features = extract_features( window_data, config['features'], ci_column=ci_column, season_anchor_day=season_anchor_day if reset_doy else None, lookback_start=current_pos ) features_scaled = features.copy().astype(float) for fi, scaler in enumerate(scalers): try: features_scaled[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten() except Exception: pass with torch.no_grad(): x_tensor = torch.tensor(features_scaled, dtype=torch.float32).unsqueeze(0).to(device) imminent_probs, detected_probs = model(x_tensor) last_prob = detected_probs[0, -1].item() if last_prob > threshold: consecutive_above_threshold += 1 else: consecutive_above_threshold = 0 if consecutive_above_threshold >= consecutive_days: harvest_idx = current_pos + window_end - consecutive_days harvest_date = field_data.iloc[harvest_idx]['Date'] harvest_dates.append((harvest_date, harvest_idx)) season_anchor_day = harvest_idx + 1 current_pos = harvest_idx + 1 break except Exception as e: continue else: break return harvest_dates def run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device): """ Phase 2: Refinement with ±40 day window. For each Phase 1 harvest, extract window and refine with argmax. Returns list of (harvest_date, harvest_idx) tuples. """ refined_harvests = [] field_data = field_data.sort_values('Date').reset_index(drop=True) for i, (phase1_harvest_date, phase1_idx) in enumerate(phase1_harvests): try: # Determine season start if i == 0: season_start_date = field_data.iloc[0]['Date'] else: prev_harvest_idx = phase1_harvests[i-1][1] season_start_idx = prev_harvest_idx + 1 if season_start_idx >= len(field_data): break season_start_date = field_data.iloc[season_start_idx]['Date'] # Extract ±40 day window window_start_date = season_start_date - pd.Timedelta(days=40) window_end_date = phase1_harvest_date + pd.Timedelta(days=40) # FIXED: Use proper index selection mask_start = field_data['Date'] >= window_start_date mask_end = field_data['Date'] <= window_end_date if mask_start.any(): window_start_idx = mask_start.idxmax() # First True index else: window_start_idx = 0 if mask_end.any(): # Last True index: find where condition becomes False from the right true_indices = np.where(mask_end)[0] window_end_idx = true_indices[-1] + 1 # +1 for slicing (exclusive end) else: window_end_idx = len(field_data) if window_end_idx <= window_start_idx: refined_harvests.append((phase1_harvest_date, phase1_idx)) continue window_data = field_data.iloc[window_start_idx:window_end_idx].copy().reset_index(drop=True) # Extract features for full window features = extract_features(window_data, config['features'], ci_column=ci_column) # Apply scalers for fi, scaler in enumerate(scalers): try: features[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten() except Exception: pass # Run model once on full window with torch.no_grad(): x_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device) imminent_probs, detected_probs = model(x_tensor) detected_probs = detected_probs.squeeze(0).cpu().numpy() # Find refined harvest (argmax in window) refined_idx_in_window = int(np.argmax(detected_probs)) refined_idx_global = window_start_idx + refined_idx_in_window refined_harvest_date = field_data.iloc[refined_idx_global]['Date'] refined_harvests.append((refined_harvest_date, refined_idx_global)) except Exception: refined_harvests.append((phase1_harvest_date, phase1_idx)) return refined_harvests def run_two_step_refinement(df: pd.DataFrame, model, config, scalers, device=None, phase1_threshold=0.45, phase1_consecutive=2): """ Two-step harvest detection for each field: 1. Phase 1: Growing window with threshold confirmation 2. Phase 2: ±40 day refinement with argmax Args: phase1_threshold (float): Probability threshold for Phase 1 (default 0.45, tuned for Model 307) phase1_consecutive (int): Consecutive days required (default 2, reduced from 3 for robustness) Returns list of dicts with field, season_start_date, season_end_date, etc. """ if device is None: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") results = [] # CI column is 'FitData' (interpolated CI) - NOT 'value' (raw with NAs) # 'FitData' is already gap-filled by R stage 03, ready for ML ci_column = 'FitData' # Group by field and count total fields for progress field_groups = list(df.groupby('field')) total_fields = len(field_groups) harvests_found = 0 print(f" Processing {total_fields} fields...") print(f" Phase 1 parameters: threshold={phase1_threshold}, consecutive_days={phase1_consecutive}") for idx, (field, field_data) in enumerate(field_groups, 1): # Simple progress indicator pct = int((idx / total_fields) * 100) bar_length = 40 filled = int((idx / total_fields) * bar_length) bar = "█" * filled + "░" * (bar_length - filled) print(f" [{bar}] {pct:3d}% ({idx}/{total_fields} fields)", end='\r') field_data = field_data.sort_values('Date').reset_index(drop=True) # Phase 1: Growing window detection phase1_harvests = run_phase1_growing_window(field_data, model, config, scalers, ci_column, device, threshold=phase1_threshold, consecutive_days=phase1_consecutive) if not phase1_harvests: continue # Phase 2: Refinement phase2_harvests = run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device) # Store results for i, (harvest_date, harvest_idx) in enumerate(phase2_harvests): if i == 0: season_start_date = field_data.iloc[0]['Date'] else: prev_harvest_idx = phase2_harvests[i-1][1] season_start_idx = prev_harvest_idx + 1 if season_start_idx >= len(field_data): break season_start_date = field_data.iloc[season_start_idx]['Date'] season_end_date = harvest_date result = { 'field': field, 'season': i + 1, 'season_start_date': season_start_date, 'season_end_date': season_end_date, 'phase2_harvest_date': harvest_date, } results.append(result) harvests_found += 1 print() # New line after progress bar print(f" ✓ Complete: Found {harvests_found} harvest events across {total_fields} fields") if results: print(f" Sample harvest dates: {results[0]['phase2_harvest_date']}") if len(results) > 1: print(f" {results[-1]['phase2_harvest_date']}") return results def build_production_harvest_table(refined_results: List[Dict]) -> pd.DataFrame: """ Build a DataFrame from refined results with columns for production pipeline. One row per field/season with season start and end dates (formatted as YYYY-MM-DD). """ if not refined_results: print("WARNING: No refined results to build table") return pd.DataFrame(columns=['field', 'season', 'season_start_date', 'season_end_date']) # Build DataFrame df = pd.DataFrame(refined_results) # Ensure date columns are datetime df['season_start_date'] = pd.to_datetime(df['season_start_date']).dt.strftime('%Y-%m-%d') df['season_end_date'] = pd.to_datetime(df['season_end_date']).dt.strftime('%Y-%m-%d') df['phase2_harvest_date'] = pd.to_datetime(df['phase2_harvest_date']).dt.strftime('%Y-%m-%d') print(f"Built production table with {len(df)} field/season combinations") return df