369 lines
14 KiB
Python
369 lines
14 KiB
Python
r"""
|
|
Script: 02_harvest_imminent_weekly.py
|
|
Purpose: WEEKLY MONITORING - Run WEEKLY/DAILY to get real-time harvest status for all fields
|
|
|
|
This script runs on RECENT CI data (typically last 300 days) to predict whether each field
|
|
is approaching harvest. Use this for operational decision-making and real-time alerts.
|
|
|
|
RUN FREQUENCY: Weekly (or daily if required)
|
|
INPUT:
|
|
- harvest.xlsx (baseline from scripts 22+23 - contains last harvest date per field)
|
|
Location: laravel_app/storage/app/{project}/Data/harvest.xlsx
|
|
- ci_data_for_python.csv (complete CI data from R script)
|
|
Location: laravel_app/storage/app/{project}/Data/extracted_ci/ci_data_for_python/ci_data_for_python.csv
|
|
OUTPUT:
|
|
- reports/kpis/field_stats/{project}_harvest_imminent_week_{WW}_{YYYY}.csv (weekly probabilities: field, imminent_prob, detected_prob, week, year)
|
|
|
|
Workflow:
|
|
1. Load harvest.xlsx to find last harvest date (season_end) per field
|
|
2. Load ci_data_for_python.csv (complete CI data)
|
|
3. For each field, extract all CI data AFTER last harvest (complete current season)
|
|
4. Run Model 307 inference on full season sequence (last timestep probabilities)
|
|
5. Export week_WW_YYYY.csv with probabilities
|
|
|
|
Output Columns:
|
|
- field: Field ID
|
|
- sub_field: Sub-field identifier
|
|
- imminent_prob: Probability field will be harvestable in next 28 days (0.0-1.0)
|
|
- detected_prob: Probability field is currently being harvested (0.0-1.0)
|
|
- week: ISO week number
|
|
- year: Year
|
|
- as_of_date: Latest date in dataset
|
|
- num_days: Number of days of history used
|
|
|
|
Use Cases:
|
|
- Alert when imminent_prob > 0.7 (prepare harvest operations)
|
|
- Alert when detected_prob > 0.6 (field is being harvested)
|
|
- Track trends over weeks to validate baseline predictions
|
|
- Feed into 09b script for weekly dashboard reports
|
|
|
|
Usage:
|
|
python python_app/31_harvest_imminent_weekly.py angata
|
|
|
|
Examples:
|
|
python python_app/31_harvest_imminent_weekly.py angata
|
|
python python_app/31_harvest_imminent_weekly.py esa
|
|
python python_app/31_harvest_imminent_weekly.py chemba
|
|
|
|
If no project specified, defaults to 'angata'
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import torch
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from harvest_date_pred_utils import (
|
|
load_model_and_config,
|
|
extract_features,
|
|
)
|
|
|
|
|
|
def load_harvest_dates(harvest_file):
|
|
"""Load last harvest end dates from harvest.xlsx (output from scripts 22+23)."""
|
|
print("[1/5] Loading harvest data for season boundaries...")
|
|
|
|
if not Path(harvest_file).exists():
|
|
print(f" ERROR: {harvest_file} not found")
|
|
print(f" harvest.xlsx is required to determine current season boundaries")
|
|
return None
|
|
|
|
try:
|
|
harvest_df = pd.read_excel(harvest_file)
|
|
print(f" Loaded {len(harvest_df)} season records")
|
|
|
|
# season_end contains the last harvest date for each season
|
|
harvest_df['season_end'] = pd.to_datetime(harvest_df['season_end'])
|
|
harvest_df['field'] = harvest_df['field'].astype(str).str.strip()
|
|
|
|
# Group by field and get the LATEST season_end_date (most recent harvest)
|
|
# This marks the start of the current season
|
|
harvest_dates = {}
|
|
for field_id, group in harvest_df.groupby('field'):
|
|
latest_harvest = group['season_end'].max()
|
|
harvest_dates[field_id] = latest_harvest
|
|
|
|
print(f" Successfully mapped {len(harvest_dates)} fields")
|
|
print(f" Last harvest dates range: {min(harvest_dates.values()).date()} to {max(harvest_dates.values()).date()}")
|
|
return harvest_dates
|
|
except Exception as e:
|
|
print(f" ERROR loading harvest.xlsx: {e}")
|
|
return None
|
|
|
|
|
|
def run_rds_to_csv_conversion():
|
|
"""Run R script to convert RDS to CSV."""
|
|
print("\n[2/5] Converting RDS to CSV (daily interpolation)...")
|
|
r_script = Path("02b_convert_rds_to_csv.R")
|
|
|
|
if not r_script.exists():
|
|
print(f" ERROR: {r_script} not found")
|
|
return False
|
|
|
|
# Use full path to Rscript on Windows
|
|
rscript_exe = r"C:\Program Files\R\R-4.4.3\bin\x64\Rscript.exe"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[rscript_exe, str(r_script)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=300
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f" ERROR running R script:\n{result.stderr}")
|
|
return False
|
|
|
|
# Show last few lines of output
|
|
lines = result.stdout.strip().split('\n')
|
|
for line in lines[-5:]:
|
|
if line.strip():
|
|
print(f" {line}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
return False
|
|
|
|
|
|
def load_ci_data(csv_file):
|
|
"""Load CI data."""
|
|
print("\n[3/5] Loading CI data...")
|
|
|
|
if not Path(csv_file).exists():
|
|
print(f" ERROR: {csv_file} not found")
|
|
return None
|
|
|
|
ci_data = pd.read_csv(csv_file)
|
|
ci_data['Date'] = pd.to_datetime(ci_data['Date'])
|
|
|
|
print(f" Loaded {len(ci_data)} daily rows for {ci_data['field'].nunique()} fields")
|
|
print(f" Date range: {ci_data['Date'].min().date()} to {ci_data['Date'].max().date()}")
|
|
|
|
return ci_data
|
|
|
|
|
|
def extract_seasonal_data(field_id, harvest_date, ci_data):
|
|
"""
|
|
Extract CI data from harvest date to latest for a specific field.
|
|
Returns dataframe sorted by date, or None if insufficient data.
|
|
"""
|
|
# field_id is int, ci_data['field'] is also int
|
|
field_data = ci_data[ci_data['field'] == field_id].copy()
|
|
|
|
if len(field_data) == 0:
|
|
return None
|
|
|
|
# Filter from harvest date onwards
|
|
field_data = field_data[field_data['Date'] >= harvest_date].sort_values('Date')
|
|
|
|
# Need at least 30 days of data for meaningful inference
|
|
if len(field_data) < 30:
|
|
return None
|
|
|
|
return field_data
|
|
|
|
|
|
def run_inference_on_season(field_data, model, config, scalers, device, ci_column='FitData'):
|
|
"""
|
|
Run Model 307 inference on recent field CI history.
|
|
Predicts probability that field will be ready to harvest in next 28 days.
|
|
Uses last timestep from the provided data sequence.
|
|
Returns (imminent_prob, detected_prob) for prediction.
|
|
"""
|
|
try:
|
|
# Use last 300 days of data for inference (enough history for meaningful patterns,
|
|
# avoids training data seasonality mismatch)
|
|
if len(field_data) > 300:
|
|
field_data = field_data.iloc[-300:]
|
|
|
|
# Extract features
|
|
features_array = extract_features(field_data, config['features'], ci_column)
|
|
|
|
if features_array.shape[0] < 10:
|
|
return None, None
|
|
|
|
# Scale features using per-feature scalers (CRITICAL: same as Phase 1 in harvest_date_pred_utils.py)
|
|
# Scalers is a list of StandardScaler objects, one per feature
|
|
if scalers and isinstance(scalers, list):
|
|
for fi, scaler in enumerate(scalers):
|
|
try:
|
|
features_array[:, fi] = scaler.transform(features_array[:, fi].reshape(-1, 1)).flatten()
|
|
except Exception:
|
|
pass
|
|
|
|
# Run inference
|
|
with torch.no_grad():
|
|
x_tensor = torch.tensor(features_array, dtype=torch.float32).unsqueeze(0).to(device)
|
|
out_imm, out_det = model(x_tensor)
|
|
|
|
# Get last timestep probabilities
|
|
imminent_prob = out_imm.squeeze(0)[-1].cpu().item()
|
|
detected_prob = out_det.squeeze(0)[-1].cpu().item()
|
|
|
|
return round(imminent_prob, 4), round(detected_prob, 4)
|
|
|
|
except Exception as e:
|
|
return None, None
|
|
|
|
|
|
def main():
|
|
# Get project name from command line or use default
|
|
project_name = sys.argv[1] if len(sys.argv) > 1 else "angata"
|
|
|
|
# Construct paths - work from either python_app/ or root smartcane/ directory
|
|
# Try root first (laravel_app/...), then fall back to ../ (running from python_app/)
|
|
if Path("laravel_app/storage/app").exists():
|
|
base_storage = Path("laravel_app/storage/app") / project_name / "Data"
|
|
else:
|
|
base_storage = Path("../laravel_app/storage/app") / project_name / "Data"
|
|
|
|
ci_data_dir = base_storage / "extracted_ci" / "ci_data_for_python"
|
|
CI_DATA_FILE = ci_data_dir / "ci_data_for_python.csv"
|
|
HARVEST_FILE = base_storage / "harvest.xlsx" # Output from scripts 22+23
|
|
|
|
# Determine week and year from current date for timestamped export
|
|
today = datetime.now()
|
|
week_num = int(today.strftime('%V'))
|
|
year_num = int(today.strftime('%Y'))
|
|
|
|
# Output directory: reports/kpis/field_stats/
|
|
reports_dir = base_storage.parent / "reports" / "kpis" / "field_stats"
|
|
reports_dir.mkdir(parents=True, exist_ok=True)
|
|
OUTPUT_CSV = reports_dir / f"{project_name}_harvest_imminent_week_{week_num:02d}_{year_num}.csv"
|
|
|
|
print("="*80)
|
|
print(f"HARVEST IMMINENT PROBABILITY - WEEKLY MONITORING ({project_name})")
|
|
print("="*80)
|
|
|
|
# [1] Load harvest dates (required to determine season boundaries)
|
|
harvest_dates = load_harvest_dates(HARVEST_FILE)
|
|
if harvest_dates is None or len(harvest_dates) == 0:
|
|
print(f"ERROR: Cannot run without harvest.xlsx - required to determine current season boundaries")
|
|
print(f" Please run scripts 22 (baseline prediction) and 23 (format conversion) first")
|
|
return
|
|
|
|
# [2] Load CI data
|
|
print(f"\n[2/5] Loading CI data...")
|
|
print(f" From: {CI_DATA_FILE}")
|
|
|
|
if not CI_DATA_FILE.exists():
|
|
print(f" ERROR: {CI_DATA_FILE} not found")
|
|
print(f" Expected at: {CI_DATA_FILE.resolve()}")
|
|
print(f"\n Run 02b_convert_rds_to_csv.R first to generate this file:")
|
|
print(f" Rscript r_app/02b_convert_ci_rds_to_csv.R {project_name}")
|
|
return
|
|
|
|
ci_data = load_ci_data(CI_DATA_FILE)
|
|
if ci_data is None:
|
|
print("ERROR: Could not load CI data")
|
|
return
|
|
|
|
# [3] Load model (from python_app directory)
|
|
print("\n[3/5] Loading Model 307...")
|
|
model_dir = Path("python_app") # Model files located in python_app/ directory
|
|
model, config, scalers = load_model_and_config(model_dir)
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
print(f" Device: {device}")
|
|
|
|
# [4] Run inference per field
|
|
print("\n[4/5] Running seasonal inference...")
|
|
|
|
results_list = []
|
|
ci_column = config['data']['ci_column']
|
|
|
|
# Get field metadata
|
|
field_meta = ci_data.groupby('field').agg({
|
|
'sub_field': 'first',
|
|
'Date': 'max'
|
|
}).reset_index()
|
|
field_meta.columns = ['field', 'sub_field', 'latest_date']
|
|
|
|
count = 0
|
|
for field_id in ci_data['field'].unique():
|
|
# Convert field_id to string for consistency
|
|
field_id_str = str(field_id).strip()
|
|
|
|
# Get metadata
|
|
meta = field_meta[field_meta['field'] == field_id]
|
|
if len(meta) == 0:
|
|
continue
|
|
|
|
sub_field = meta['sub_field'].iloc[0]
|
|
latest_date = meta['latest_date'].iloc[0]
|
|
|
|
# Get last harvest date for this field (start of current season)
|
|
last_harvest = harvest_dates.get(field_id_str)
|
|
if last_harvest is None:
|
|
continue
|
|
|
|
# Extract all CI data AFTER last harvest (complete current season)
|
|
field_data = ci_data[ci_data['field'] == field_id].copy()
|
|
field_data = field_data.sort_values('Date')
|
|
field_data = field_data[field_data['Date'] > last_harvest] # After last harvest
|
|
|
|
# Need at least 30 days of data since planting
|
|
if len(field_data) < 30:
|
|
continue
|
|
|
|
# Run inference on full current season to predict next 28 days
|
|
imminent_prob, detected_prob = run_inference_on_season(
|
|
field_data, model, config, scalers, device, ci_column
|
|
)
|
|
|
|
if imminent_prob is None:
|
|
continue
|
|
|
|
week = int(latest_date.strftime('%V'))
|
|
year = int(latest_date.strftime('%Y'))
|
|
|
|
results_list.append({
|
|
'field': field_id,
|
|
'sub_field': sub_field,
|
|
'imminent_prob': imminent_prob,
|
|
'detected_prob': detected_prob,
|
|
'week': week,
|
|
'year': year,
|
|
'as_of_date': latest_date,
|
|
'num_days': len(field_data),
|
|
})
|
|
|
|
count += 1
|
|
|
|
print(f" Completed inference for {count} fields")
|
|
|
|
# Build output DataFrame
|
|
df = pd.DataFrame(results_list)
|
|
df.to_csv(OUTPUT_CSV, index=False)
|
|
|
|
print(f"\n[5/5] Exporting results...")
|
|
print(f"✓ Exported {len(df)} fields to {OUTPUT_CSV}")
|
|
print(f" Output location: {OUTPUT_CSV.resolve()}")
|
|
|
|
if len(df) > 0:
|
|
print(f"\nSample rows:")
|
|
print(df[['field', 'sub_field', 'imminent_prob', 'detected_prob', 'num_days', 'week', 'year']].head(15).to_string(index=False))
|
|
|
|
# Show alert summary
|
|
high_imminent = len(df[df['imminent_prob'] > 0.7])
|
|
high_detected = len(df[df['detected_prob'] > 0.6])
|
|
print(f"\n⚠ ALERTS:")
|
|
print(f" Fields with imminent_prob > 0.70: {high_imminent}")
|
|
print(f" Fields with detected_prob > 0.60: {high_detected}")
|
|
else:
|
|
print(f" WARNING: No results exported - check CI data availability")
|
|
|
|
print(f"\nStorage structure:")
|
|
print(f" Input harvest: laravel_app/storage/app/{project_name}/Data/harvest.xlsx")
|
|
print(f" Input CI: laravel_app/storage/app/{project_name}/Data/extracted_ci/ci_data_for_python/")
|
|
print(f" Output: laravel_app/storage/app/{project_name}/reports/kpis/field_stats/")
|
|
print(f" Filename: {project_name}_harvest_imminent_week_{week_num:02d}_{year_num}.csv")
|
|
print(f"\nReady to load into 80_calculate_kpis.R")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|