Add script 23: Convert harvest format from production to standard, remove diagnostic script

This commit is contained in:
Timon 2026-01-15 09:28:13 +01:00
parent 7b347ddba6
commit 4c7ca85d29

View file

@ -0,0 +1,225 @@
#!/usr/bin/env python3
"""
Script 23: Convert Harvest Format from Production to Standard
==============================================================
Converts harvest_production_export.xlsx (output from script 22) to the standard
harvest.xlsx format used by R scripts 24+.
INPUT:
- harvest_production_export.xlsx (from script 22)
Columns: field, season (numeric), season_start_date, season_end_date, phase2_harvest_date
Contains detected harvests only
OUTPUT:
- harvest.xlsx (standard format for R pipeline)
Columns: field, sub_field, year, season, season_start, season_end, age, sub_area, tonnage_ha
LOGIC:
1. For each field, group all detections chronologically
2. Create one row per completed season (has season_end date)
3. season_start = first CI date (2024-09-25) for first season, then previous harvest + 1 day
4. season_end = phase2_harvest_date (refined harvest date from script 22)
5. year = extracted from season_start date
6. season = "Data{year} : {field}" format
7. sub_field = field (same as field)
8. age, sub_area, tonnage_ha = left empty (filled by R scripts later or other data sources)
Date Format: YYYY-MM-DD
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import sys
from pathlib import Path
def get_ci_date_range(project_dir):
"""
Get the date range of CI data to establish season_start for first season.
Returns: (min_date, max_date) as datetime objects
"""
base_storage = Path("../laravel_app/storage/app") / project_dir / "Data"
ci_data_dir = base_storage / "extracted_ci" / "ci_data_for_python"
ci_csv_path = ci_data_dir / "ci_data_for_python.csv"
if not ci_csv_path.exists():
# Fallback: assume data starts 2024-09-25 (typical for projects)
print(f"[WARNING] CI data CSV not found at {ci_csv_path}, assuming CI starts 2024-09-25")
return datetime(2024, 9, 25), datetime.now()
try:
# Read only date column (first column usually has dates)
df = pd.read_csv(ci_csv_path, nrows=1)
columns = df.columns.tolist()
date_col = columns[0] # First column should be dates
df = pd.read_csv(ci_csv_path, usecols=[date_col])
df[date_col] = pd.to_datetime(df[date_col])
min_date = df[date_col].min()
max_date = df[date_col].max()
print(f"[INFO] CI data date range: {min_date.date()} to {max_date.date()}")
return min_date, max_date
except Exception as e:
print(f"[WARNING] Error reading CI date range: {e}, using fallback dates")
return datetime(2024, 9, 25), datetime.now()
def convert_harvest_format(project_dir="angata"):
"""
Convert harvest_production_export.xlsx to standard harvest.xlsx format.
Parameters:
-----------
project_dir : str
Project name (angata, esa, chemba, etc.)
"""
print(f"\n{'='*80}")
print(f"Script 23: Convert Harvest Format")
print(f"Project: {project_dir}")
print(f"{'='*80}\n")
# Get paths (same as script 22)
base_storage = Path("../laravel_app/storage/app") / project_dir / "Data"
harvest_data_dir = base_storage / "HarvestData"
source_file = harvest_data_dir / "harvest_production_export.xlsx"
output_file = base_storage / "harvest.xlsx"
# Check source file exists
if not source_file.exists():
print(f"[ERROR] Source file not found: {source_file}")
print(f"[ERROR] Please run script 22 first to generate harvest_production_export.xlsx")
return False
print(f"[INFO] Reading source file: {source_file}")
try:
# Read production format
df_source = pd.read_excel(source_file)
print(f"[INFO] Loaded {len(df_source)} harvest detections")
print(f"[INFO] Columns: {list(df_source.columns)}")
# Validate required columns
required_cols = ["field", "season_start_date", "season_end_date", "phase2_harvest_date"]
missing = [c for c in required_cols if c not in df_source.columns]
if missing:
print(f"[ERROR] Missing columns: {missing}")
return False
# Get CI date range for establishing first season start
ci_min_date, ci_max_date = get_ci_date_range(project_dir)
first_season_start = ci_min_date.strftime("%Y-%m-%d")
# Convert to datetime for processing
df_source["phase2_harvest_date"] = pd.to_datetime(df_source["phase2_harvest_date"])
df_source["field"] = df_source["field"].astype(str)
# Sort by field and harvest date
df_source = df_source.sort_values(["field", "phase2_harvest_date"]).reset_index(drop=True)
# Build output rows
output_rows = []
# Group by field
for field_id, group_df in df_source.groupby("field"):
# Get all harvest dates for this field, sorted chronologically
harvest_dates = group_df["phase2_harvest_date"].dt.strftime("%Y-%m-%d").tolist()
print(f"[INFO] Field {field_id}: {len(harvest_dates)} harvest detection(s)")
# First season always starts from CI beginning
current_season_start = first_season_start
for harvest_idx, harvest_date in enumerate(harvest_dates):
# Extract year from current season start
season_start_obj = pd.to_datetime(current_season_start)
year = season_start_obj.year
# Create season identifier
season_str = f"Data{year} : {field_id}"
# Create row for completed season
row = {
"field": field_id,
"sub_field": field_id, # Same as field
"year": year,
"season": season_str,
"season_start": current_season_start,
"season_end": harvest_date, # Filled because harvest detected
"age": "", # Empty - will be calculated in R
"sub_area": "", # Empty - will be populated from other data
"tonnage_ha": "" # Empty - will be populated from other data
}
output_rows.append(row)
# Next season starts day after this harvest
next_season_start = (pd.to_datetime(harvest_date) + timedelta(days=1)).strftime("%Y-%m-%d")
current_season_start = next_season_start
# If field has detections, check if we should add a final incomplete season
# Only if we're not at the end of the monitoring period
last_harvest = pd.to_datetime(harvest_dates[-1])
days_after_last = (ci_max_date - last_harvest).days
if days_after_last > 30: # More than 30 days of data after last harvest
# Add incomplete season row (season_end empty)
season_start_obj = pd.to_datetime(current_season_start)
year = season_start_obj.year
season_str = f"Data{year} : {field_id}"
row = {
"field": field_id,
"sub_field": field_id,
"year": year,
"season": season_str,
"season_start": current_season_start,
"season_end": "", # Empty - season still active
"age": "",
"sub_area": "",
"tonnage_ha": ""
}
output_rows.append(row)
print(f"[INFO] Added incomplete season starting {current_season_start}")
# Create output DataFrame
df_output = pd.DataFrame(output_rows)
# Reorder columns to match standard format
column_order = ["field", "sub_field", "year", "season", "season_start", "season_end",
"age", "sub_area", "tonnage_ha"]
df_output = df_output[column_order]
# Write to Excel
print(f"\n[INFO] Writing output file: {output_file}")
df_output.to_excel(output_file, index=False, sheet_name="Harvest")
# Print summary
print(f"\n[SUCCESS] Conversion complete!")
print(f"[INFO] Output rows: {len(df_output)}")
print(f"[INFO] Unique fields: {df_output['field'].nunique()}")
print(f"\n[INFO] Sample output:")
print(df_output.head(10).to_string(index=False))
return True
except Exception as e:
print(f"[ERROR] Conversion failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
# Get project from command line or use default
project = sys.argv[1] if len(sys.argv) > 1 else "angata"
success = convert_harvest_format(project)
sys.exit(0 if success else 1)