SmartCane/python_app/31_harvest_imminent_weekly.py

369 lines
14 KiB
Python

"""
Script: 02_harvest_imminent_weekly.py
Purpose: WEEKLY MONITORING - Run WEEKLY/DAILY to get real-time harvest status for all fields
This script runs on RECENT CI data (typically last 300 days) to predict whether each field
is approaching harvest. Use this for operational decision-making and real-time alerts.
RUN FREQUENCY: Weekly (or daily if required)
INPUT:
- harvest.xlsx (baseline from scripts 22+23 - contains last harvest date per field)
Location: laravel_app/storage/app/{project}/Data/harvest.xlsx
- ci_data_for_python.csv (complete CI data from R script)
Location: laravel_app/storage/app/{project}/Data/extracted_ci/ci_data_for_python/ci_data_for_python.csv
OUTPUT:
- reports/kpis/field_stats/{project}_harvest_imminent_week_{WW}_{YYYY}.csv (weekly probabilities: field, imminent_prob, detected_prob, week, year)
Workflow:
1. Load harvest.xlsx to find last harvest date (season_end) per field
2. Load ci_data_for_python.csv (complete CI data)
3. For each field, extract all CI data AFTER last harvest (complete current season)
4. Run Model 307 inference on full season sequence (last timestep probabilities)
5. Export week_WW_YYYY.csv with probabilities
Output Columns:
- field: Field ID
- sub_field: Sub-field identifier
- imminent_prob: Probability field will be harvestable in next 28 days (0.0-1.0)
- detected_prob: Probability field is currently being harvested (0.0-1.0)
- week: ISO week number
- year: Year
- as_of_date: Latest date in dataset
- num_days: Number of days of history used
Use Cases:
- Alert when imminent_prob > 0.7 (prepare harvest operations)
- Alert when detected_prob > 0.6 (field is being harvested)
- Track trends over weeks to validate baseline predictions
- Feed into 09b script for weekly dashboard reports
Usage:
python 02_harvest_imminent_weekly.py [project_name]
Examples:
python 02_harvest_imminent_weekly.py angata
python 02_harvest_imminent_weekly.py esa
python 02_harvest_imminent_weekly.py chemba
If no project specified, defaults to 'angata'
"""
import pandas as pd
import numpy as np
import torch
import subprocess
import sys
from pathlib import Path
from datetime import datetime, timedelta
from harvest_date_pred_utils import (
load_model_and_config,
extract_features,
)
def load_harvest_dates(harvest_file):
"""Load last harvest end dates from harvest.xlsx (output from scripts 22+23)."""
print("[1/5] Loading harvest data for season boundaries...")
if not Path(harvest_file).exists():
print(f" ERROR: {harvest_file} not found")
print(f" harvest.xlsx is required to determine current season boundaries")
return None
try:
harvest_df = pd.read_excel(harvest_file)
print(f" Loaded {len(harvest_df)} season records")
# season_end contains the last harvest date for each season
harvest_df['season_end'] = pd.to_datetime(harvest_df['season_end'])
harvest_df['field'] = harvest_df['field'].astype(str).str.strip()
# Group by field and get the LATEST season_end_date (most recent harvest)
# This marks the start of the current season
harvest_dates = {}
for field_id, group in harvest_df.groupby('field'):
latest_harvest = group['season_end'].max()
harvest_dates[field_id] = latest_harvest
print(f" Successfully mapped {len(harvest_dates)} fields")
print(f" Last harvest dates range: {min(harvest_dates.values()).date()} to {max(harvest_dates.values()).date()}")
return harvest_dates
except Exception as e:
print(f" ERROR loading harvest.xlsx: {e}")
return None
def run_rds_to_csv_conversion():
"""Run R script to convert RDS to CSV."""
print("\n[2/5] Converting RDS to CSV (daily interpolation)...")
r_script = Path("02b_convert_rds_to_csv.R")
if not r_script.exists():
print(f" ERROR: {r_script} not found")
return False
# Use full path to Rscript on Windows
rscript_exe = r"C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe"
try:
result = subprocess.run(
[rscript_exe, str(r_script)],
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
print(f" ERROR running R script:\n{result.stderr}")
return False
# Show last few lines of output
lines = result.stdout.strip().split('\n')
for line in lines[-5:]:
if line.strip():
print(f" {line}")
return True
except Exception as e:
print(f" ERROR: {e}")
return False
def load_ci_data(csv_file):
"""Load CI data."""
print("\n[3/5] Loading CI data...")
if not Path(csv_file).exists():
print(f" ERROR: {csv_file} not found")
return None
ci_data = pd.read_csv(csv_file)
ci_data['Date'] = pd.to_datetime(ci_data['Date'])
print(f" Loaded {len(ci_data)} daily rows for {ci_data['field'].nunique()} fields")
print(f" Date range: {ci_data['Date'].min().date()} to {ci_data['Date'].max().date()}")
return ci_data
def extract_seasonal_data(field_id, harvest_date, ci_data):
"""
Extract CI data from harvest date to latest for a specific field.
Returns dataframe sorted by date, or None if insufficient data.
"""
# field_id is int, ci_data['field'] is also int
field_data = ci_data[ci_data['field'] == field_id].copy()
if len(field_data) == 0:
return None
# Filter from harvest date onwards
field_data = field_data[field_data['Date'] >= harvest_date].sort_values('Date')
# Need at least 30 days of data for meaningful inference
if len(field_data) < 30:
return None
return field_data
def run_inference_on_season(field_data, model, config, scalers, device, ci_column='FitData'):
"""
Run Model 307 inference on recent field CI history.
Predicts probability that field will be ready to harvest in next 28 days.
Uses last timestep from the provided data sequence.
Returns (imminent_prob, detected_prob) for prediction.
"""
try:
# Use last 300 days of data for inference (enough history for meaningful patterns,
# avoids training data seasonality mismatch)
if len(field_data) > 300:
field_data = field_data.iloc[-300:]
# Extract features
features_array = extract_features(field_data, config['features'], ci_column)
if features_array.shape[0] < 10:
return None, None
# Scale features using per-feature scalers (CRITICAL: same as Phase 1 in harvest_date_pred_utils.py)
# Scalers is a list of StandardScaler objects, one per feature
if scalers and isinstance(scalers, list):
for fi, scaler in enumerate(scalers):
try:
features_array[:, fi] = scaler.transform(features_array[:, fi].reshape(-1, 1)).flatten()
except Exception:
pass
# Run inference
with torch.no_grad():
x_tensor = torch.tensor(features_array, dtype=torch.float32).unsqueeze(0).to(device)
out_imm, out_det = model(x_tensor)
# Get last timestep probabilities
imminent_prob = out_imm.squeeze(0)[-1].cpu().item()
detected_prob = out_det.squeeze(0)[-1].cpu().item()
return round(imminent_prob, 4), round(detected_prob, 4)
except Exception as e:
return None, None
def main():
# Get project name from command line or use default
project_name = sys.argv[1] if len(sys.argv) > 1 else "angata"
# Construct paths - work from either python_app/ or root smartcane/ directory
# Try root first (laravel_app/...), then fall back to ../ (running from python_app/)
if Path("laravel_app/storage/app").exists():
base_storage = Path("laravel_app/storage/app") / project_name / "Data"
else:
base_storage = Path("../laravel_app/storage/app") / project_name / "Data"
ci_data_dir = base_storage / "extracted_ci" / "ci_data_for_python"
CI_DATA_FILE = ci_data_dir / "ci_data_for_python.csv"
HARVEST_FILE = base_storage / "harvest.xlsx" # Output from scripts 22+23
# Determine week and year from current date for timestamped export
today = datetime.now()
week_num = int(today.strftime('%V'))
year_num = int(today.strftime('%Y'))
# Output directory: reports/kpis/field_stats/
reports_dir = base_storage.parent / "reports" / "kpis" / "field_stats"
reports_dir.mkdir(parents=True, exist_ok=True)
OUTPUT_CSV = reports_dir / f"{project_name}_harvest_imminent_week_{week_num:02d}_{year_num}.csv"
print("="*80)
print(f"HARVEST IMMINENT PROBABILITY - WEEKLY MONITORING ({project_name})")
print("="*80)
# [1] Load harvest dates (required to determine season boundaries)
harvest_dates = load_harvest_dates(HARVEST_FILE)
if harvest_dates is None or len(harvest_dates) == 0:
print(f"ERROR: Cannot run without harvest.xlsx - required to determine current season boundaries")
print(f" Please run scripts 22 (baseline prediction) and 23 (format conversion) first")
return
# [2] Load CI data
print(f"\n[2/5] Loading CI data...")
print(f" From: {CI_DATA_FILE}")
if not CI_DATA_FILE.exists():
print(f" ERROR: {CI_DATA_FILE} not found")
print(f" Expected at: {CI_DATA_FILE.resolve()}")
print(f"\n Run 02b_convert_rds_to_csv.R first to generate this file:")
print(f" Rscript r_app/02b_convert_ci_rds_to_csv.R {project_name}")
return
ci_data = load_ci_data(CI_DATA_FILE)
if ci_data is None:
print("ERROR: Could not load CI data")
return
# [3] Load model (from python_app directory)
print("\n[3/5] Loading Model 307...")
model_dir = Path(".") # Current directory is python_app/, contains model.pt, config.json, scalers.pkl
model, config, scalers = load_model_and_config(model_dir)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f" Device: {device}")
# [4] Run inference per field
print("\n[4/5] Running seasonal inference...")
results_list = []
ci_column = config['data']['ci_column']
# Get field metadata
field_meta = ci_data.groupby('field').agg({
'sub_field': 'first',
'Date': 'max'
}).reset_index()
field_meta.columns = ['field', 'sub_field', 'latest_date']
count = 0
for field_id in ci_data['field'].unique():
# Convert field_id to string for consistency
field_id_str = str(field_id).strip()
# Get metadata
meta = field_meta[field_meta['field'] == field_id]
if len(meta) == 0:
continue
sub_field = meta['sub_field'].iloc[0]
latest_date = meta['latest_date'].iloc[0]
# Get last harvest date for this field (start of current season)
last_harvest = harvest_dates.get(field_id_str)
if last_harvest is None:
continue
# Extract all CI data AFTER last harvest (complete current season)
field_data = ci_data[ci_data['field'] == field_id].copy()
field_data = field_data.sort_values('Date')
field_data = field_data[field_data['Date'] > last_harvest] # After last harvest
# Need at least 30 days of data since planting
if len(field_data) < 30:
continue
# Run inference on full current season to predict next 28 days
imminent_prob, detected_prob = run_inference_on_season(
field_data, model, config, scalers, device, ci_column
)
if imminent_prob is None:
continue
week = int(latest_date.strftime('%V'))
year = int(latest_date.strftime('%Y'))
results_list.append({
'field': field_id,
'sub_field': sub_field,
'imminent_prob': imminent_prob,
'detected_prob': detected_prob,
'week': week,
'year': year,
'as_of_date': latest_date,
'num_days': len(field_data),
})
count += 1
print(f" Completed inference for {count} fields")
# Build output DataFrame
df = pd.DataFrame(results_list)
df.to_csv(OUTPUT_CSV, index=False)
print(f"\n[5/5] Exporting results...")
print(f"✓ Exported {len(df)} fields to {OUTPUT_CSV}")
print(f" Output location: {OUTPUT_CSV.resolve()}")
if len(df) > 0:
print(f"\nSample rows:")
print(df[['field', 'sub_field', 'imminent_prob', 'detected_prob', 'num_days', 'week', 'year']].head(15).to_string(index=False))
# Show alert summary
high_imminent = len(df[df['imminent_prob'] > 0.7])
high_detected = len(df[df['detected_prob'] > 0.6])
print(f"\n⚠ ALERTS:")
print(f" Fields with imminent_prob > 0.70: {high_imminent}")
print(f" Fields with detected_prob > 0.60: {high_detected}")
else:
print(f" WARNING: No results exported - check CI data availability")
print(f"\nStorage structure:")
print(f" Input harvest: laravel_app/storage/app/{project_name}/Data/harvest.xlsx")
print(f" Input CI: laravel_app/storage/app/{project_name}/Data/extracted_ci/ci_data_for_python/")
print(f" Output: laravel_app/storage/app/{project_name}/reports/kpis/field_stats/")
print(f" Filename: {project_name}_harvest_imminent_week_{week_num:02d}_{year_num}.csv")
print(f"\nReady to load into 80_calculate_kpis.R")
if __name__ == "__main__":
main()