SmartCane/python_app/31_harvest_imminent_weekly.py

349 lines
12 KiB
Python

"""
Script: 02_harvest_imminent_weekly.py
Purpose: WEEKLY MONITORING - Run WEEKLY/DAILY to get real-time harvest status for all fields
This script runs on RECENT CI data (typically last 300 days) to predict whether each field
is approaching harvest. Use this for operational decision-making and real-time alerts.
RUN FREQUENCY: Weekly (or daily if required)
INPUT:
- ci_data_for_python.csv (recent CI data from 02b_convert_rds_to_csv.R)
Location: laravel_app/storage/app/{project}/Data/extracted_ci/ci_data_for_python/ci_data_for_python.csv
- harvest_production_export.xlsx (baseline from script 01 - optional, for reference)
OUTPUT:
- harvest_imminent_weekly.csv (weekly probabilities: field, imminent_prob, detected_prob, week, year)
Workflow:
1. Load harvest_production_export.xlsx (baseline dates - optional, for context)
2. Load ci_data_for_python.csv (recent CI data)
3. For each field, extract last 300 days of history
4. Run Model 307 inference on full sequence (last timestep probabilities)
5. Export harvest_imminent_weekly.csv with probabilities
Output Columns:
- field: Field ID
- sub_field: Sub-field identifier
- imminent_prob: Probability field will be harvestable in next 28 days (0.0-1.0)
- detected_prob: Probability field is currently being harvested (0.0-1.0)
- week: ISO week number
- year: Year
- as_of_date: Latest date in dataset
- num_days: Number of days of history used
Use Cases:
- Alert when imminent_prob > 0.7 (prepare harvest operations)
- Alert when detected_prob > 0.6 (field is being harvested)
- Track trends over weeks to validate baseline predictions
- Feed into 09b script for weekly dashboard reports
Usage:
python 02_harvest_imminent_weekly.py [project_name]
Examples:
python 02_harvest_imminent_weekly.py angata
python 02_harvest_imminent_weekly.py esa
python 02_harvest_imminent_weekly.py chemba
If no project specified, defaults to 'angata'
"""
import pandas as pd
import numpy as np
import torch
import subprocess
import sys
from pathlib import Path
from datetime import datetime, timedelta
from harvest_date_pred_utils import (
load_model_and_config,
extract_features,
)
def load_harvest_dates(harvest_file):
"""Load latest harvest end dates from Excel file (from harvest_production_export.xlsx)."""
print("[1/5] Loading harvest dates...")
if not Path(harvest_file).exists():
print(f" ERROR: {harvest_file} not found")
print(" Using 180-day lookback as default")
return None
try:
harvest_df = pd.read_excel(harvest_file)
print(f" Loaded {len(harvest_df)} field-season records")
# Use season_end_date column (output from harvest prediction script)
harvest_df['season_end_date'] = pd.to_datetime(harvest_df['season_end_date'])
# Group by field and get the latest season_end_date
harvest_dates = {}
for field_id, group in harvest_df.groupby('field'):
latest_end = group['season_end_date'].max()
harvest_dates[str(field_id).strip()] = latest_end
print(f" Successfully mapped {len(harvest_dates)} fields")
print(f" Harvest end dates range: {min(harvest_dates.values()).date()} to {max(harvest_dates.values()).date()}")
return harvest_dates
except Exception as e:
print(f" ERROR loading harvest file: {e}")
print(f" Using 180-day lookback instead")
return None
def run_rds_to_csv_conversion():
"""Run R script to convert RDS to CSV."""
print("\n[2/5] Converting RDS to CSV (daily interpolation)...")
r_script = Path("02b_convert_rds_to_csv.R")
if not r_script.exists():
print(f" ERROR: {r_script} not found")
return False
# Use full path to Rscript on Windows
rscript_exe = r"C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe"
try:
result = subprocess.run(
[rscript_exe, str(r_script)],
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
print(f" ERROR running R script:\n{result.stderr}")
return False
# Show last few lines of output
lines = result.stdout.strip().split('\n')
for line in lines[-5:]:
if line.strip():
print(f" {line}")
return True
except Exception as e:
print(f" ERROR: {e}")
return False
def load_ci_data(csv_file):
"""Load CI data."""
print("\n[3/5] Loading CI data...")
if not Path(csv_file).exists():
print(f" ERROR: {csv_file} not found")
return None
ci_data = pd.read_csv(csv_file)
ci_data['Date'] = pd.to_datetime(ci_data['Date'])
print(f" Loaded {len(ci_data)} daily rows for {ci_data['field'].nunique()} fields")
print(f" Date range: {ci_data['Date'].min().date()} to {ci_data['Date'].max().date()}")
return ci_data
def extract_seasonal_data(field_id, harvest_date, ci_data):
"""
Extract CI data from harvest date to latest for a specific field.
Returns dataframe sorted by date, or None if insufficient data.
"""
# field_id is int, ci_data['field'] is also int
field_data = ci_data[ci_data['field'] == field_id].copy()
if len(field_data) == 0:
return None
# Filter from harvest date onwards
field_data = field_data[field_data['Date'] >= harvest_date].sort_values('Date')
# Need at least 30 days of data for meaningful inference
if len(field_data) < 30:
return None
return field_data
def run_inference_on_season(field_data, model, config, scalers, device, ci_column='FitData'):
"""
Run Model 307 inference on recent field CI history.
Predicts probability that field will be ready to harvest in next 28 days.
Uses last timestep from the provided data sequence.
Returns (imminent_prob, detected_prob) for prediction.
"""
try:
# Use last 300 days of data for inference (enough history for meaningful patterns,
# avoids training data seasonality mismatch)
if len(field_data) > 300:
field_data = field_data.iloc[-300:]
# Extract features
features_array = extract_features(field_data, config['features'], ci_column)
if features_array.shape[0] < 10:
return None, None
# Scale features using per-feature scalers (CRITICAL: same as Phase 1 in harvest_date_pred_utils.py)
# Scalers is a list of StandardScaler objects, one per feature
if scalers and isinstance(scalers, list):
for fi, scaler in enumerate(scalers):
try:
features_array[:, fi] = scaler.transform(features_array[:, fi].reshape(-1, 1)).flatten()
except Exception:
pass
# Run inference
with torch.no_grad():
x_tensor = torch.tensor(features_array, dtype=torch.float32).unsqueeze(0).to(device)
out_imm, out_det = model(x_tensor)
# Get last timestep probabilities
imminent_prob = out_imm.squeeze(0)[-1].cpu().item()
detected_prob = out_det.squeeze(0)[-1].cpu().item()
return round(imminent_prob, 4), round(detected_prob, 4)
except Exception as e:
return None, None
def main():
# Get project name from command line or use default
project_name = sys.argv[1] if len(sys.argv) > 1 else "angata"
# Construct paths
base_storage = Path("../laravel_app/storage/app") / project_name / "Data"
ci_data_dir = base_storage / "extracted_ci" / "ci_data_for_python"
CI_DATA_FILE = ci_data_dir / "ci_data_for_python.csv"
harvest_data_dir = base_storage / "HarvestData"
BASELINE_FILE = harvest_data_dir / "harvest_production_export.xlsx"
OUTPUT_CSV = harvest_data_dir / "harvest_imminent_weekly.csv"
harvest_data_dir.mkdir(parents=True, exist_ok=True) # Create if doesn't exist
print("="*80)
print(f"HARVEST IMMINENT PROBABILITY - WEEKLY MONITORING ({project_name})")
print("="*80)
# [1] Load harvest dates (optional - for projects with predictions)
harvest_dates = None
if BASELINE_FILE.exists():
harvest_dates = load_harvest_dates(BASELINE_FILE)
else:
print("[1/5] Loading harvest dates...")
print(f" INFO: {BASELINE_FILE} not found (optional for weekly monitoring)")
# [2] Load CI data
print(f"\n[2/5] Loading CI data...")
print(f" From: {CI_DATA_FILE}")
if not CI_DATA_FILE.exists():
print(f" ERROR: {CI_DATA_FILE} not found")
print(f" Expected at: {CI_DATA_FILE.resolve()}")
print(f"\n Run 02b_convert_rds_to_csv.R first to generate this file:")
print(f" Rscript r_app/02b_convert_ci_rds_to_csv.R {project_name}")
return
ci_data = load_ci_data(CI_DATA_FILE)
if ci_data is None:
print("ERROR: Could not load CI data")
return
# [3] Load model (from python_app directory)
print("\n[3/5] Loading Model 307...")
model_dir = Path(".") # Current directory is python_app/, contains model.pt, config.json, scalers.pkl
model, config, scalers = load_model_and_config(model_dir)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f" Device: {device}")
# [4] Run inference per field
print("\n[4/5] Running seasonal inference...")
results_list = []
ci_column = config['data']['ci_column']
# Get field metadata
field_meta = ci_data.groupby('field').agg({
'sub_field': 'first',
'Date': 'max'
}).reset_index()
field_meta.columns = ['field', 'sub_field', 'latest_date']
count = 0
for field_id in ci_data['field'].unique():
# Get metadata
meta = field_meta[field_meta['field'] == field_id]
if len(meta) == 0:
continue
sub_field = meta['sub_field'].iloc[0]
latest_date = meta['latest_date'].iloc[0]
# Use recent CI history (last 300 days from latest available data)
field_data = ci_data[ci_data['field'] == field_id].copy()
field_data = field_data.sort_values('Date')
# Keep last 300 days of history for inference
if len(field_data) > 300:
field_data = field_data.iloc[-300:]
if len(field_data) < 30:
continue
# Run inference on recent history to predict next 28 days
imminent_prob, detected_prob = run_inference_on_season(
field_data, model, config, scalers, device, ci_column
)
if imminent_prob is None:
continue
week = int(latest_date.strftime('%V'))
year = int(latest_date.strftime('%Y'))
results_list.append({
'field': field_id,
'sub_field': sub_field,
'imminent_prob': imminent_prob,
'detected_prob': detected_prob,
'week': week,
'year': year,
'as_of_date': latest_date,
'num_days': len(field_data),
})
count += 1
print(f" Completed inference for {count} fields")
# Build output DataFrame
df = pd.DataFrame(results_list)
df.to_csv(OUTPUT_CSV, index=False)
print(f"\n[5/5] Exporting results...")
print(f"✓ Exported {len(df)} fields to {OUTPUT_CSV}")
print(f" Output location: {OUTPUT_CSV.resolve()}")
if len(df) > 0:
print(f"\nSample rows:")
print(df[['field', 'sub_field', 'imminent_prob', 'detected_prob', 'num_days', 'week', 'year']].head(15).to_string(index=False))
# Show alert summary
high_imminent = len(df[df['imminent_prob'] > 0.7])
high_detected = len(df[df['detected_prob'] > 0.6])
print(f"\n⚠ ALERTS:")
print(f" Fields with imminent_prob > 0.70: {high_imminent}")
print(f" Fields with detected_prob > 0.60: {high_detected}")
else:
print(f" WARNING: No results exported - check CI data availability")
print(f"\nStorage structure:")
print(f" Input CI: laravel_app/storage/app/{project_name}/Data/extracted_ci/ci_data_for_python/")
print(f" Input baseline: laravel_app/storage/app/{project_name}/Data/HarvestData/harvest_production_export.xlsx")
print(f" Output: laravel_app/storage/app/{project_name}/Data/HarvestData/")
print(f"\nReady to load into 09b field analysis report")
if __name__ == "__main__":
main()