SmartCane/python_app/harvest_detection_experiments/tests/test_script22_debug.py
Timon fabbf3214d Enhance harvest detection logic and testing framework
- Updated `detect_mosaic_mode` function to check for grid-size subdirectories in addition to tile-named files.
- Added comprehensive tests for DOY reset logic in `test_doy_logic.py`.
- Implemented feature extraction tests in `test_feature_extraction.py`.
- Created tests for growing window method in `test_growing_window_only.py`.
- Developed a complete model inference test in `test_model_inference.py`.
- Added a debug script for testing two-step refinement logic in `test_script22_debug.py`.
2026-01-15 14:30:54 +01:00

179 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
Debug script: Test if script 22 logic is working
Tests the two-step refinement on a single known field
"""
import sys
import time
import pandas as pd
import numpy as np
import torch
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from harvest_date_pred_utils import (
load_model_and_config,
extract_features,
run_phase1_growing_window,
)
project_name = "angata"
# Find the workspace root by looking for laravel_app folder
script_dir = Path(__file__).parent
root = script_dir
while root != root.parent:
if (root / "laravel_app").exists():
break
root = root.parent
base_storage = root / "laravel_app" / "storage" / "app" / project_name / "Data"
CI_DATA_FILE = base_storage / "extracted_ci" / "ci_data_for_python" / "ci_data_for_python.csv"
MODEL_DIR = root / "python_app"
print("="*80)
print("DEBUG: Script 22 Two-Step Refinement Logic")
print("="*80)
# Load model
print("\n[1] Loading model...")
model, config, scalers = load_model_and_config(MODEL_DIR)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f" Device: {device}")
print(f" Model features: {config['features']}")
# Load CI data
print("\n[2] Loading CI data...")
ci_data = pd.read_csv(CI_DATA_FILE, dtype={'field': str})
ci_data['Date'] = pd.to_datetime(ci_data['Date'])
print(f" Total rows: {len(ci_data)}")
print(f" Fields: {ci_data['field'].nunique()}")
print(f" Date range: {ci_data['Date'].min().date()} to {ci_data['Date'].max().date()}")
# Test on a known field (field 779 from our previous tests)
test_field = "779"
field_data = ci_data[ci_data['field'] == test_field].sort_values('Date').reset_index(drop=True)
print(f"\n[3] Testing on field {test_field}...")
print(f" Data points: {len(field_data)}")
print(f" Date range: {field_data['Date'].min().date()} to {field_data['Date'].max().date()}")
if len(field_data) == 0:
print(f" ERROR: No data for field {test_field}")
sys.exit(1)
# Extract features
print(f"\n[4] Extracting features for field {test_field}...")
try:
features = extract_features(field_data.reset_index(drop=True), config['features'], ci_column='value')
print(f" Features shape: {features.shape}")
print(f" Features dtype: {features.dtype}")
except Exception as e:
print(f" ERROR: Could not extract features: {e}")
sys.exit(1)
# Normalize and run model
print(f"\n[5] Running Phase 1 GROWING WINDOW method (threshold=0.5, consecutive=3)...")
print(f" This simulates real production: expanding windows, checking each day")
print(f" Expected: ~477 model runs for 477 days (SLOW)")
import time
start_time = time.time()
# Add instrumentation to see how many model runs are happening
original_run = run_phase1_growing_window
def instrumented_run(*args, **kwargs):
import sys
from harvest_date_pred_utils import extract_features
field_data = args[0]
model = args[1]
config = args[2]
scalers = args[3]
ci_column = args[4]
device = args[5]
threshold = kwargs.get('threshold', 0.3)
consecutive_days = kwargs.get('consecutive_days', 2)
harvest_dates = []
current_pos = 0
model_runs = 0
print(f" Starting growing window loop...")
while current_pos < len(field_data):
consecutive_above_threshold = 0
loop_start = current_pos
for window_end in range(current_pos + 1, len(field_data) + 1):
window_data = field_data.iloc[current_pos:window_end].copy().reset_index(drop=True)
try:
features = extract_features(window_data, config['features'], ci_column=ci_column)
features_scaled = features.copy().astype(float)
for fi, scaler in enumerate(scalers):
try:
features_scaled[:, fi] = scaler.transform(features[:, fi].reshape(-1, 1)).flatten()
except Exception as e:
raise ValueError(f"Scaler {fi} failed: {e}")
import torch
with torch.no_grad():
x_tensor = torch.tensor(features_scaled, dtype=torch.float32).unsqueeze(0).to(device)
imminent_probs, detected_probs = model(x_tensor)
model_runs += 1
last_prob = detected_probs[0, -1].item()
if last_prob > threshold:
consecutive_above_threshold += 1
else:
consecutive_above_threshold = 0
if consecutive_above_threshold >= consecutive_days:
harvest_date = field_data.iloc[current_pos + window_end - consecutive_days]['Date']
harvest_dates.append((harvest_date, current_pos + window_end - consecutive_days))
current_pos = current_pos + window_end - consecutive_days + 1
break
except Exception as e:
pass
else:
break
print(f" Model runs performed: {model_runs}")
return harvest_dates
phase1_results = instrumented_run(
field_data.reset_index(drop=True),
model, config, scalers, 'value', device,
threshold=0.5,
consecutive_days=3
)
elapsed = time.time() - start_time
print(f"\n Time elapsed: {elapsed:.2f}s")
if phase1_results:
print(f" ✓ Phase 1 detected {len(phase1_results)} harvest(s):")
# Get probabilities for display by running model once on full field
with torch.no_grad():
X = features.reshape(1, -1, len(config['features']))
X_normalized = np.zeros_like(X)
for fi, scaler in enumerate(scalers):
X_normalized[0, :, fi] = scaler.transform(X[0, :, fi].reshape(-1, 1)).flatten()
X_tensor = torch.from_numpy(X_normalized).float().to(device)
_, detected_probs = model(X_tensor)
detected_np = detected_probs[0].cpu().numpy()
for harvest_date, harvest_idx in phase1_results:
prob = detected_np[harvest_idx] if harvest_idx < len(detected_np) else 0.0
print(f" {harvest_date.date()}: index {harvest_idx}, probability={prob:.4f}")
else:
print(f" ✗ Phase 1: No harvest detected")