178 lines
6.1 KiB
Python
178 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CONVERT_ANGATA_HARVEST.PY
|
|
=========================
|
|
Converts Angata harvest data from its received format to the standardized SmartCane format.
|
|
|
|
Input format (as received from Angata):
|
|
Contract No | Field No | dop/doh
|
|
0001 | 1 | 01/06/2023
|
|
|
|
Output format (SmartCane standard, matching Aura):
|
|
field | sub_field | year | season_start | season_end | age | sub_area | tonnage_ha
|
|
|
|
The script:
|
|
1. Reads Angata harvest.xlsx
|
|
2. Extracts field numbers and dates
|
|
3. Creates field names from field numbers (e.g., "Field_1", "Field_2", etc.)
|
|
4. Extracts year from date
|
|
5. Uses dop/doh as season_start (other fields left as NaN for now)
|
|
6. Writes output to harvest.xlsx in SmartCane format
|
|
|
|
Usage:
|
|
python convert_angata_harvest.py
|
|
"""
|
|
|
|
import pandas as pd
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
def convert_angata_harvest():
|
|
"""Convert Angata harvest data to SmartCane format."""
|
|
|
|
# Define paths
|
|
angata_dir = Path("laravel_app/storage/app/angata/Data")
|
|
input_file = angata_dir / "harvest.xlsx"
|
|
output_file = angata_dir / "harvest.xlsx"
|
|
|
|
# Read all sheets from input file
|
|
print(f"Reading Angata harvest data from: {input_file}")
|
|
xls = pd.ExcelFile(input_file)
|
|
print(f"Sheet names found: {xls.sheet_names}")
|
|
|
|
# Collect all data from all sheets
|
|
all_data = []
|
|
|
|
for sheet_name in xls.sheet_names:
|
|
print(f"\nProcessing sheet: {sheet_name}")
|
|
df = pd.read_excel(input_file, sheet_name=sheet_name)
|
|
|
|
# Remove any completely empty rows
|
|
df = df.dropna(how='all')
|
|
|
|
# Skip if no data
|
|
if len(df) == 0:
|
|
print(f" Sheet {sheet_name} is empty, skipping")
|
|
continue
|
|
|
|
# Check if this sheet has the required Field No column
|
|
if 'Field No' not in df.columns:
|
|
print(f" Sheet {sheet_name} does not have 'Field No' column, skipping")
|
|
continue
|
|
|
|
# Check for date column (can be dop/doh or doh/dop)
|
|
date_col = None
|
|
if 'dop/doh' in df.columns:
|
|
date_col = 'dop/doh'
|
|
elif 'doh/dop' in df.columns:
|
|
date_col = 'doh/dop'
|
|
else:
|
|
print(f" Sheet {sheet_name} does not have date column (dop/doh or doh/dop), skipping")
|
|
continue
|
|
|
|
# Standardize date column name to 'dop/doh' for consistency
|
|
df = df.rename(columns={date_col: 'dop/doh'})
|
|
|
|
# Clean field numbers that may contain garbage
|
|
df = df[pd.notna(df['Field No'])]
|
|
|
|
print(f" Loaded {len(df)} records from {sheet_name}")
|
|
all_data.append(df)
|
|
|
|
# Combine all sheets
|
|
if not all_data:
|
|
raise ValueError("No valid data found in any sheet")
|
|
|
|
print(f"\nCombining data from {len(all_data)} sheets...")
|
|
df = pd.concat(all_data, ignore_index=True)
|
|
df = df.dropna(how='all') # Remove empty rows after concat
|
|
df = df[pd.notna(df['Field No'])] # Ensure no NaN field numbers
|
|
|
|
print(f"Total records after combining: {len(df)}")
|
|
|
|
# Validate input columns
|
|
required_cols = ['Field No', 'dop/doh']
|
|
for col in required_cols:
|
|
if col not in df.columns:
|
|
raise ValueError(f"Missing required column: {col}")
|
|
|
|
# Create conversion dataframe
|
|
converted = pd.DataFrame()
|
|
|
|
# Field name = field number as string (e.g., "1", "2", "10")
|
|
converted['field'] = df['Field No'].astype(str)
|
|
|
|
# Sub-field is same as field
|
|
converted['sub_field'] = converted['field']
|
|
|
|
# Parse dop/doh dates - format is DD/MM/YYYY
|
|
print("\nParsing dates...")
|
|
dates = []
|
|
years = []
|
|
for idx, date_str in enumerate(df['dop/doh']):
|
|
try:
|
|
# Handle NaN/null values
|
|
if pd.isna(date_str):
|
|
dates.append(pd.NaT)
|
|
years.append(None)
|
|
else:
|
|
# Parse date string in DD/MM/YYYY format
|
|
date_obj = pd.to_datetime(date_str, format='%d/%m/%Y')
|
|
dates.append(date_obj)
|
|
years.append(int(date_obj.year))
|
|
except Exception as e:
|
|
print(f"Warning: Could not parse date at row {idx}: {date_str} - {e}")
|
|
dates.append(pd.NaT)
|
|
years.append(None)
|
|
|
|
# Ensure lists match DataFrame length (handle edge cases)
|
|
assert len(dates) == len(df), f"Date list length {len(dates)} != DataFrame length {len(df)}"
|
|
assert len(years) == len(df), f"Years list length {len(years)} != DataFrame length {len(df)}"
|
|
|
|
converted['season_start'] = dates
|
|
converted['year'] = years
|
|
|
|
# Convert year to integer (handle NaN values)
|
|
converted['year'] = converted['year'].apply(lambda x: int(x) if pd.notna(x) else None)
|
|
|
|
# Other fields (not provided in Angata data)
|
|
# season_end: empty string (to be filled in by other scripts)
|
|
converted['season_end'] = ""
|
|
# Replace NaN with None for age, sub_area, tonnage_ha
|
|
converted['age'] = None
|
|
converted['sub_area'] = None
|
|
converted['tonnage_ha'] = None
|
|
|
|
# Ensure year is integer type in DataFrame
|
|
converted['year'] = converted['year'].astype('Int64') # Nullable integer type
|
|
|
|
# Reorder columns to match Aura format
|
|
converted = converted[['field', 'sub_field', 'year', 'season_start', 'season_end', 'age', 'sub_area', 'tonnage_ha']]
|
|
|
|
# Display summary
|
|
print("\nConversion summary:")
|
|
print(f" Total records: {len(converted)}")
|
|
print(f" Date range: {converted['season_start'].min()} to {converted['season_start'].max()}")
|
|
print(f" Years: {sorted(converted['year'].dropna().unique())}")
|
|
print(f"\nFirst 10 rows:")
|
|
print(converted.head(10))
|
|
|
|
# Save to Excel
|
|
print(f"\nSaving converted data to: {output_file}")
|
|
converted.to_excel(output_file, index=False, sheet_name='Harvest')
|
|
print("Conversion complete!")
|
|
|
|
return converted
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
result = convert_angata_harvest()
|
|
print("\nSuccess! Angata harvest data has been converted to SmartCane format.")
|
|
except Exception as e:
|
|
print(f"\nError during conversion: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|