SmartCane/python_app/harvest_date_pred_utils.py

549 lines
22 KiB
Python

"""
Self-contained utility module for two-step harvest date prediction and Excel export.
Includes model architecture, feature engineering, and core prediction logic.
"""
import sys
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import pickle
import yaml
from pathlib import Path
from typing import Tuple, Dict, Any, List
from sklearn.preprocessing import StandardScaler
# ============================================================================
# TORCH MODELS (from src/models.py, inlined for self-containment)
# ============================================================================
class HarvestDetectionLSTM(nn.Module):
"""Unidirectional LSTM for harvest detection with dual outputs."""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 1, dropout: float = 0.5):
super(HarvestDetectionLSTM, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False,
batch_first=True
)
self.imminent_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
self.detected_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
lstm_out, _ = self.lstm(x)
batch_size, seq_len, hidden_size = lstm_out.shape
lstm_flat = lstm_out.reshape(-1, hidden_size)
imminent_flat = self.imminent_head(lstm_flat).reshape(batch_size, seq_len)
detected_flat = self.detected_head(lstm_flat).reshape(batch_size, seq_len)
return imminent_flat, detected_flat
class HarvestDetectionGRU(nn.Module):
"""Unidirectional GRU for harvest detection with dual outputs."""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 1, dropout: float = 0.5):
super(HarvestDetectionGRU, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.gru = nn.GRU(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False,
batch_first=True
)
self.imminent_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
self.detected_head = nn.Sequential(
nn.Linear(hidden_size, 16),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(16, 1),
nn.Sigmoid()
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
gru_out, _ = self.gru(x)
batch_size, seq_len, hidden_size = gru_out.shape
gru_flat = gru_out.reshape(-1, hidden_size)
imminent_flat = self.imminent_head(gru_flat).reshape(batch_size, seq_len)
detected_flat = self.detected_head(gru_flat).reshape(batch_size, seq_len)
return imminent_flat, detected_flat
def create_model(model_type: str, input_size: int, hidden_size: int = 128,
num_layers: int = 1, dropout: float = 0.5, device = None) -> nn.Module:
"""Create a model from registry."""
registry = {'LSTM': HarvestDetectionLSTM, 'GRU': HarvestDetectionGRU}
if model_type not in registry:
raise ValueError(f"Unknown model type: {model_type}")
model = registry[model_type](
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout
)
if device:
model = model.to(device)
# Print model info
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Model: {model_type}")
print(f" Input size: {input_size}")
print(f" Hidden size: {hidden_size}")
print(f" Num layers: {num_layers}")
print(f" Dropout: {dropout}")
print(f" Total parameters: {total_params:,}")
print(f" Trainable parameters: {trainable_params:,}")
print(f" Device: {device}")
return model
# ============================================================================
# FEATURE ENGINEERING (from src/feature_engineering.py, simplified for inline)
# ============================================================================
def compute_ci_features(ci_series: pd.Series, dah_series: pd.Series = None) -> pd.DataFrame:
"""Compute all CI-based features (state, velocity, acceleration, min/max/range/std/CV)."""
features = pd.DataFrame(index=ci_series.index)
# State (moving averages)
features['CI_raw'] = ci_series
features['7d_MA'] = ci_series.rolling(window=7, min_periods=1).mean()
features['14d_MA'] = ci_series.rolling(window=14, min_periods=1).mean()
features['21d_MA'] = ci_series.rolling(window=21, min_periods=1).mean()
# Velocity (gradient of MA)
for window in [7, 14, 21]:
ma = ci_series.rolling(window=window, min_periods=1).mean()
features[f'{window}d_velocity'] = ma.diff() / 1.0 # Simplified gradient
# Acceleration (gradient of velocity)
for window in [7, 14, 21]:
ma = ci_series.rolling(window=window, min_periods=1).mean()
vel = ma.diff()
features[f'{window}d_acceleration'] = vel.diff()
# Min, Max, Range
for window in [7, 14, 21]:
features[f'{window}d_min'] = ci_series.rolling(window=window, min_periods=1).min()
features[f'{window}d_max'] = ci_series.rolling(window=window, min_periods=1).max()
features[f'{window}d_range'] = features[f'{window}d_max'] - features[f'{window}d_min']
# Std and CV
for window in [7, 14, 21]:
features[f'{window}d_std'] = ci_series.rolling(window=window, min_periods=1).std()
ma = ci_series.rolling(window=window, min_periods=1).mean()
features[f'{window}d_CV'] = features[f'{window}d_std'] / (ma + 1e-6)
# DAH normalized (Days After Harvest)
if dah_series is not None:
features['DAH_normalized'] = dah_series / 450.0
return features.fillna(0)
def extract_features(data_df: pd.DataFrame, feature_names: List[str], ci_column: str = 'FitData',
season_anchor_day: int = None, lookback_start: int = 0) -> np.ndarray:
"""
Extract and return specified features as numpy array.
Args:
data_df: DataFrame with Date and CI data (may be a window after a harvest)
feature_names: List of feature names to extract
ci_column: Name of CI column
season_anchor_day: Day in FULL sequence where this season started (for DAH reset)
DAH will be recalculated as: 1, 2, 3, ... from this point
lookback_start: Starting index in original full data (for season reset calculation)
Returns:
NumPy array of shape (len(data_df), len(feature_names))
"""
# Compute all CI features
ci_series = data_df[ci_column].astype(float)
# Compute DAH (age/days since season start) - NOT day-of-year!
# DAH is a continuous counter: 1, 2, 3, ..., 475 (doesn't cycle at 365)
# It only resets to 1 after a harvest is detected (new season)
dah_series = None
if 'DAH_normalized' in feature_names:
if season_anchor_day is not None and lookback_start >= season_anchor_day:
# Season was reset after harvest. Recalculate DAH as simple counter from 1
# This is a window starting at or after harvest, so DAH should be: 1, 2, 3, ...
dah_series = pd.Series(np.arange(1, len(data_df) + 1), index=data_df.index)
elif 'DAH' in data_df.columns:
# Use DAH directly from CSV - already calculated as continuous age counter
dah_series = pd.Series(data_df['DAH'].astype(float).values, index=data_df.index)
else:
# Fallback: create continuous age counter (1, 2, 3, ...)
dah_series = pd.Series(np.arange(1, len(data_df) + 1), index=data_df.index)
all_features = compute_ci_features(ci_series, dah_series)
# Select requested features
requested = [f for f in feature_names if f in all_features.columns]
if not requested:
raise ValueError(f"No valid features found. Requested: {feature_names}, Available: {all_features.columns.tolist()}")
return all_features[requested].values
# ============================================================================
# MAIN UTILITY FUNCTIONS
# ============================================================================
def load_model_and_config(model_dir: Path):
"""Load model, config, and scalers from a given directory."""
cwd = Path.cwd()
# Try different naming conventions
candidates = [
# Standard names
(model_dir / "config.json", model_dir / "model.pt", model_dir / "scalers.pkl"),
# Model 307 specific names
(model_dir / "model_config.json", model_dir / "model_307.pt", model_dir / "model_scalers.pkl"),
# CWD standard names
(cwd / "config.json", cwd / "model.pt", cwd / "scalers.pkl"),
# CWD Model 307 names
(cwd / "model_config.json", cwd / "model_307.pt", cwd / "model_scalers.pkl"),
]
config_file = model_file = scalers_file = None
for cfg, mdl, scl in candidates:
if cfg.exists() and mdl.exists() and scl.exists():
config_file, model_file, scalers_file = cfg, mdl, scl
print(f"Found model files in: {cfg.parent}")
break
if not (config_file and model_file and scalers_file):
missing = []
for cfg, mdl, scl in candidates:
if not cfg.exists():
missing.append(str(cfg))
if not mdl.exists():
missing.append(str(mdl))
if not scl.exists():
missing.append(str(scl))
raise FileNotFoundError(
f"Missing model files. Checked multiple locations. Missing: {missing}"
)
with open(config_file) as f:
config = yaml.safe_load(f)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = create_model(
model_type=config['model']['type'],
input_size=len(config['features']),
hidden_size=config['model']['hidden_size'],
num_layers=config['model']['num_layers'],
dropout=config['model']['dropout'],
device=device
)
print(f"Loading weights from: {model_file}")
model.load_state_dict(torch.load(model_file, map_location=device, weights_only=False))
model.eval()
with open(scalers_file, 'rb') as f:
scalers = pickle.load(f)
return model, config, scalers
def load_harvest_data(data_file: Path) -> pd.DataFrame:
"""Load harvest data CSV."""
print(f"Loading data from: {data_file}")
df = pd.read_csv(data_file)
print(f"Loaded {len(df)} rows")
return df
def run_phase1_growing_window(field_data, model, config, scalers, ci_column, device,
threshold=0.45, consecutive_days=2):
"""
Phase 1: Growing window detection with DOY season reset.
For each detected harvest, reset DOY counter for the next season.
This allows the model to detect multiple consecutive harvests in multi-year data.
"""
harvest_dates = []
season_anchor_day = 0
current_pos = 0
while current_pos < len(field_data):
consecutive_above_threshold = 0
min_window_size = 120
for window_end in range(current_pos + 1, len(field_data) + 1):
window_data = field_data.iloc[current_pos:window_end].copy().reset_index(drop=True)
if len(window_data) < min_window_size:
continue
try:
reset_doy = current_pos > season_anchor_day
features = extract_features(
window_data,
config['features'],
ci_column=ci_column,
season_anchor_day=season_anchor_day if reset_doy else None,
lookback_start=current_pos
)
features_scaled = features.copy().astype(float)
for fi, scaler in enumerate(scalers):
try:
features_scaled[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten()
except Exception:
pass
with torch.no_grad():
x_tensor = torch.tensor(features_scaled, dtype=torch.float32).unsqueeze(0).to(device)
imminent_probs, detected_probs = model(x_tensor)
last_prob = detected_probs[0, -1].item()
if last_prob > threshold:
consecutive_above_threshold += 1
else:
consecutive_above_threshold = 0
if consecutive_above_threshold >= consecutive_days:
harvest_idx = current_pos + window_end - consecutive_days
harvest_date = field_data.iloc[harvest_idx]['Date']
harvest_dates.append((harvest_date, harvest_idx))
season_anchor_day = harvest_idx + 1
current_pos = harvest_idx + 1
break
except Exception as e:
continue
else:
break
return harvest_dates
def run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device):
"""
Phase 2: Refinement with ±40 day window.
For each Phase 1 harvest, extract window and refine with argmax.
Returns list of (harvest_date, harvest_idx) tuples.
"""
refined_harvests = []
field_data = field_data.sort_values('Date').reset_index(drop=True)
for i, (phase1_harvest_date, phase1_idx) in enumerate(phase1_harvests):
try:
# Determine season start
if i == 0:
season_start_date = field_data.iloc[0]['Date']
else:
prev_harvest_idx = phase1_harvests[i-1][1]
season_start_idx = prev_harvest_idx + 1
if season_start_idx >= len(field_data):
break
season_start_date = field_data.iloc[season_start_idx]['Date']
# Extract ±40 day window
window_start_date = season_start_date - pd.Timedelta(days=40)
window_end_date = phase1_harvest_date + pd.Timedelta(days=40)
# FIXED: Use proper index selection
mask_start = field_data['Date'] >= window_start_date
mask_end = field_data['Date'] <= window_end_date
if mask_start.any():
window_start_idx = mask_start.idxmax() # First True index
else:
window_start_idx = 0
if mask_end.any():
# Last True index: find where condition becomes False from the right
true_indices = np.where(mask_end)[0]
window_end_idx = true_indices[-1] + 1 # +1 for slicing (exclusive end)
else:
window_end_idx = len(field_data)
if window_end_idx <= window_start_idx:
refined_harvests.append((phase1_harvest_date, phase1_idx))
continue
window_data = field_data.iloc[window_start_idx:window_end_idx].copy().reset_index(drop=True)
# Extract features for full window
features = extract_features(window_data, config['features'], ci_column=ci_column)
# Apply scalers
for fi, scaler in enumerate(scalers):
try:
features[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten()
except Exception:
pass
# Run model once on full window
with torch.no_grad():
x_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device)
imminent_probs, detected_probs = model(x_tensor)
detected_probs = detected_probs.squeeze(0).cpu().numpy()
# Find refined harvest (argmax in window)
refined_idx_in_window = int(np.argmax(detected_probs))
refined_idx_global = window_start_idx + refined_idx_in_window
refined_harvest_date = field_data.iloc[refined_idx_global]['Date']
refined_harvests.append((refined_harvest_date, refined_idx_global))
except Exception:
refined_harvests.append((phase1_harvest_date, phase1_idx))
return refined_harvests
def run_two_step_refinement(df: pd.DataFrame, model, config, scalers, device=None,
phase1_threshold=0.45, phase1_consecutive=2):
"""
Two-step harvest detection for each field:
1. Phase 1: Growing window with threshold confirmation
2. Phase 2: ±40 day refinement with argmax
Args:
phase1_threshold (float): Probability threshold for Phase 1 (default 0.45, tuned for Model 307)
phase1_consecutive (int): Consecutive days required (default 2, reduced from 3 for robustness)
Returns list of dicts with field, season_start_date, season_end_date, etc.
"""
if device is None:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results = []
# CI column is 'FitData' (interpolated CI) - NOT 'value' (raw with NAs)
# 'FitData' is already gap-filled by R stage 03, ready for ML
ci_column = 'FitData'
# Group by field and count total fields for progress
field_groups = list(df.groupby('field'))
total_fields = len(field_groups)
harvests_found = 0
print(f" Processing {total_fields} fields...")
print(f" Phase 1 parameters: threshold={phase1_threshold}, consecutive_days={phase1_consecutive}")
for idx, (field, field_data) in enumerate(field_groups, 1):
# Simple progress indicator
pct = int((idx / total_fields) * 100)
bar_length = 40
filled = int((idx / total_fields) * bar_length)
bar = "" * filled + "" * (bar_length - filled)
print(f" [{bar}] {pct:3d}% ({idx}/{total_fields} fields)", end='\r')
field_data = field_data.sort_values('Date').reset_index(drop=True)
# Phase 1: Growing window detection
phase1_harvests = run_phase1_growing_window(field_data, model, config, scalers, ci_column, device,
threshold=phase1_threshold, consecutive_days=phase1_consecutive)
if not phase1_harvests:
continue
# Phase 2: Refinement
phase2_harvests = run_phase2_refinement(field_data, phase1_harvests, model, config, scalers, ci_column, device)
# Store results
for i, (harvest_date, harvest_idx) in enumerate(phase2_harvests):
if i == 0:
season_start_date = field_data.iloc[0]['Date']
else:
prev_harvest_idx = phase2_harvests[i-1][1]
season_start_idx = prev_harvest_idx + 1
if season_start_idx >= len(field_data):
break
season_start_date = field_data.iloc[season_start_idx]['Date']
season_end_date = harvest_date
result = {
'field': field,
'season': i + 1,
'season_start_date': season_start_date,
'season_end_date': season_end_date,
'phase2_harvest_date': harvest_date,
}
results.append(result)
harvests_found += 1
print() # New line after progress bar
print(f" ✓ Complete: Found {harvests_found} harvest events across {total_fields} fields")
if results:
print(f" Sample harvest dates: {results[0]['phase2_harvest_date']}")
if len(results) > 1:
print(f" {results[-1]['phase2_harvest_date']}")
return results
def build_production_harvest_table(refined_results: List[Dict]) -> pd.DataFrame:
"""
Build a DataFrame from refined results with columns for production pipeline.
One row per field/season with season start and end dates (formatted as YYYY-MM-DD).
"""
if not refined_results:
print("WARNING: No refined results to build table")
return pd.DataFrame(columns=['field', 'season', 'season_start_date', 'season_end_date'])
# Build DataFrame
df = pd.DataFrame(refined_results)
# Ensure date columns are datetime
df['season_start_date'] = pd.to_datetime(df['season_start_date']).dt.strftime('%Y-%m-%d')
df['season_end_date'] = pd.to_datetime(df['season_end_date']).dt.strftime('%Y-%m-%d')
df['phase2_harvest_date'] = pd.to_datetime(df['phase2_harvest_date']).dt.strftime('%Y-%m-%d')
print(f"Built production table with {len(df)} field/season combinations")
return df