""" Script: 02_harvest_imminent_weekly.py Purpose: WEEKLY MONITORING - Run WEEKLY/DAILY to get real-time harvest status for all fields This script runs on RECENT CI data (typically last 300 days) to predict whether each field is approaching harvest. Use this for operational decision-making and real-time alerts. RUN FREQUENCY: Weekly (or daily if required) INPUT: - ci_data_for_python.csv (recent CI data from 02b_convert_rds_to_csv.R) Location: laravel_app/storage/app/{project}/Data/extracted_ci/ci_data_for_python/ci_data_for_python.csv - harvest_production_export.xlsx (baseline from script 01 - optional, for reference) OUTPUT: - harvest_imminent_weekly.csv (weekly probabilities: field, imminent_prob, detected_prob, week, year) Workflow: 1. Load harvest_production_export.xlsx (baseline dates - optional, for context) 2. Load ci_data_for_python.csv (recent CI data) 3. For each field, extract last 300 days of history 4. Run Model 307 inference on full sequence (last timestep probabilities) 5. Export harvest_imminent_weekly.csv with probabilities Output Columns: - field: Field ID - sub_field: Sub-field identifier - imminent_prob: Probability field will be harvestable in next 28 days (0.0-1.0) - detected_prob: Probability field is currently being harvested (0.0-1.0) - week: ISO week number - year: Year - as_of_date: Latest date in dataset - num_days: Number of days of history used Use Cases: - Alert when imminent_prob > 0.7 (prepare harvest operations) - Alert when detected_prob > 0.6 (field is being harvested) - Track trends over weeks to validate baseline predictions - Feed into 09b script for weekly dashboard reports Usage: python 02_harvest_imminent_weekly.py [project_name] Examples: python 02_harvest_imminent_weekly.py angata python 02_harvest_imminent_weekly.py esa python 02_harvest_imminent_weekly.py chemba If no project specified, defaults to 'angata' """ import pandas as pd import numpy as np import torch import subprocess import sys from pathlib import Path from datetime import datetime, timedelta from harvest_date_pred_utils import ( load_model_and_config, extract_features, ) def load_harvest_dates(harvest_file): """Load latest harvest end dates from Excel file (from harvest_production_export.xlsx).""" print("[1/5] Loading harvest dates...") if not Path(harvest_file).exists(): print(f" ERROR: {harvest_file} not found") print(" Using 180-day lookback as default") return None try: harvest_df = pd.read_excel(harvest_file) print(f" Loaded {len(harvest_df)} field-season records") # Use season_end_date column (output from harvest prediction script) harvest_df['season_end_date'] = pd.to_datetime(harvest_df['season_end_date']) # Group by field and get the latest season_end_date harvest_dates = {} for field_id, group in harvest_df.groupby('field'): latest_end = group['season_end_date'].max() harvest_dates[str(field_id).strip()] = latest_end print(f" Successfully mapped {len(harvest_dates)} fields") print(f" Harvest end dates range: {min(harvest_dates.values()).date()} to {max(harvest_dates.values()).date()}") return harvest_dates except Exception as e: print(f" ERROR loading harvest file: {e}") print(f" Using 180-day lookback instead") return None def run_rds_to_csv_conversion(): """Run R script to convert RDS to CSV.""" print("\n[2/5] Converting RDS to CSV (daily interpolation)...") r_script = Path("02b_convert_rds_to_csv.R") if not r_script.exists(): print(f" ERROR: {r_script} not found") return False # Use full path to Rscript on Windows rscript_exe = r"C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe" try: result = subprocess.run( [rscript_exe, str(r_script)], capture_output=True, text=True, timeout=300 ) if result.returncode != 0: print(f" ERROR running R script:\n{result.stderr}") return False # Show last few lines of output lines = result.stdout.strip().split('\n') for line in lines[-5:]: if line.strip(): print(f" {line}") return True except Exception as e: print(f" ERROR: {e}") return False def load_ci_data(csv_file): """Load CI data.""" print("\n[3/5] Loading CI data...") if not Path(csv_file).exists(): print(f" ERROR: {csv_file} not found") return None ci_data = pd.read_csv(csv_file) ci_data['Date'] = pd.to_datetime(ci_data['Date']) print(f" Loaded {len(ci_data)} daily rows for {ci_data['field'].nunique()} fields") print(f" Date range: {ci_data['Date'].min().date()} to {ci_data['Date'].max().date()}") return ci_data def extract_seasonal_data(field_id, harvest_date, ci_data): """ Extract CI data from harvest date to latest for a specific field. Returns dataframe sorted by date, or None if insufficient data. """ # field_id is int, ci_data['field'] is also int field_data = ci_data[ci_data['field'] == field_id].copy() if len(field_data) == 0: return None # Filter from harvest date onwards field_data = field_data[field_data['Date'] >= harvest_date].sort_values('Date') # Need at least 30 days of data for meaningful inference if len(field_data) < 30: return None return field_data def run_inference_on_season(field_data, model, config, scalers, device, ci_column='FitData'): """ Run Model 307 inference on recent field CI history. Predicts probability that field will be ready to harvest in next 28 days. Uses last timestep from the provided data sequence. Returns (imminent_prob, detected_prob) for prediction. """ try: # Use last 300 days of data for inference (enough history for meaningful patterns, # avoids training data seasonality mismatch) if len(field_data) > 300: field_data = field_data.iloc[-300:] # Extract features features_array = extract_features(field_data, config['features'], ci_column) if features_array.shape[0] < 10: return None, None # Scale features using per-feature scalers (CRITICAL: same as Phase 1 in harvest_date_pred_utils.py) # Scalers is a list of StandardScaler objects, one per feature if scalers and isinstance(scalers, list): for fi, scaler in enumerate(scalers): try: features_array[:, fi] = scaler.transform(features_array[:, fi].reshape(-1, 1)).flatten() except Exception: pass # Run inference with torch.no_grad(): x_tensor = torch.tensor(features_array, dtype=torch.float32).unsqueeze(0).to(device) out_imm, out_det = model(x_tensor) # Get last timestep probabilities imminent_prob = out_imm.squeeze(0)[-1].cpu().item() detected_prob = out_det.squeeze(0)[-1].cpu().item() return round(imminent_prob, 4), round(detected_prob, 4) except Exception as e: return None, None def main(): # Get project name from command line or use default project_name = sys.argv[1] if len(sys.argv) > 1 else "angata" # Construct paths base_storage = Path("../laravel_app/storage/app") / project_name / "Data" ci_data_dir = base_storage / "extracted_ci" / "ci_data_for_python" CI_DATA_FILE = ci_data_dir / "ci_data_for_python.csv" harvest_data_dir = base_storage / "HarvestData" BASELINE_FILE = harvest_data_dir / "harvest_production_export.xlsx" OUTPUT_CSV = harvest_data_dir / "harvest_imminent_weekly.csv" harvest_data_dir.mkdir(parents=True, exist_ok=True) # Create if doesn't exist print("="*80) print(f"HARVEST IMMINENT PROBABILITY - WEEKLY MONITORING ({project_name})") print("="*80) # [1] Load harvest dates (optional - for projects with predictions) harvest_dates = None if BASELINE_FILE.exists(): harvest_dates = load_harvest_dates(BASELINE_FILE) else: print("[1/5] Loading harvest dates...") print(f" INFO: {BASELINE_FILE} not found (optional for weekly monitoring)") # [2] Load CI data print(f"\n[2/5] Loading CI data...") print(f" From: {CI_DATA_FILE}") if not CI_DATA_FILE.exists(): print(f" ERROR: {CI_DATA_FILE} not found") print(f" Expected at: {CI_DATA_FILE.resolve()}") print(f"\n Run 02b_convert_rds_to_csv.R first to generate this file:") print(f" Rscript r_app/02b_convert_ci_rds_to_csv.R {project_name}") return ci_data = load_ci_data(CI_DATA_FILE) if ci_data is None: print("ERROR: Could not load CI data") return # [3] Load model (from python_app directory) print("\n[3/5] Loading Model 307...") model_dir = Path(".") # Current directory is python_app/, contains model.pt, config.json, scalers.pkl model, config, scalers = load_model_and_config(model_dir) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f" Device: {device}") # [4] Run inference per field print("\n[4/5] Running seasonal inference...") results_list = [] ci_column = config['data']['ci_column'] # Get field metadata field_meta = ci_data.groupby('field').agg({ 'sub_field': 'first', 'Date': 'max' }).reset_index() field_meta.columns = ['field', 'sub_field', 'latest_date'] count = 0 for field_id in ci_data['field'].unique(): # Get metadata meta = field_meta[field_meta['field'] == field_id] if len(meta) == 0: continue sub_field = meta['sub_field'].iloc[0] latest_date = meta['latest_date'].iloc[0] # Use recent CI history (last 300 days from latest available data) field_data = ci_data[ci_data['field'] == field_id].copy() field_data = field_data.sort_values('Date') # Keep last 300 days of history for inference if len(field_data) > 300: field_data = field_data.iloc[-300:] if len(field_data) < 30: continue # Run inference on recent history to predict next 28 days imminent_prob, detected_prob = run_inference_on_season( field_data, model, config, scalers, device, ci_column ) if imminent_prob is None: continue week = int(latest_date.strftime('%V')) year = int(latest_date.strftime('%Y')) results_list.append({ 'field': field_id, 'sub_field': sub_field, 'imminent_prob': imminent_prob, 'detected_prob': detected_prob, 'week': week, 'year': year, 'as_of_date': latest_date, 'num_days': len(field_data), }) count += 1 print(f" Completed inference for {count} fields") # Build output DataFrame df = pd.DataFrame(results_list) df.to_csv(OUTPUT_CSV, index=False) print(f"\n[5/5] Exporting results...") print(f"✓ Exported {len(df)} fields to {OUTPUT_CSV}") print(f" Output location: {OUTPUT_CSV.resolve()}") if len(df) > 0: print(f"\nSample rows:") print(df[['field', 'sub_field', 'imminent_prob', 'detected_prob', 'num_days', 'week', 'year']].head(15).to_string(index=False)) # Show alert summary high_imminent = len(df[df['imminent_prob'] > 0.7]) high_detected = len(df[df['detected_prob'] > 0.6]) print(f"\n⚠ ALERTS:") print(f" Fields with imminent_prob > 0.70: {high_imminent}") print(f" Fields with detected_prob > 0.60: {high_detected}") else: print(f" WARNING: No results exported - check CI data availability") print(f"\nStorage structure:") print(f" Input CI: laravel_app/storage/app/{project_name}/Data/extracted_ci/ci_data_for_python/") print(f" Input baseline: laravel_app/storage/app/{project_name}/Data/HarvestData/harvest_production_export.xlsx") print(f" Output: laravel_app/storage/app/{project_name}/Data/HarvestData/") print(f"\nReady to load into 09b field analysis report") if __name__ == "__main__": main()