SmartCane/python_app/experiments/omnicloud/test_omnicloudmask_simple.py
2026-01-06 14:17:37 +01:00

270 lines
9.9 KiB
Python

"""
Simple OmniCloudMask test script for PlanetScope imagery
Based on: https://dpird-dma.github.io/blog/Cloud-Masking-for-PlanetScope-Imagery-Using-OmniCloudMask/
Tests OmniCloudMask on 2024-12-30 ESA image
"""
from omnicloudmask import predict_from_array, load_multiband
from functools import partial
from pathlib import Path
import rasterio as rio
import numpy as np
import geopandas as gpd
from rasterio.features import rasterize
from rasterio.transform import Affine
print("="*70)
print("OMNICLOUDMASK TEST - ESA PROJECT")
print("="*70)
# Configuration
project = 'esa'
test_date = '2024-12-03'
# Get absolute path to the project root (go up one level from python_app/)
project_root = Path(__file__).resolve().parent.parent
planetscope_image = project_root / "laravel_app" / "storage" / "app" / project / "cloud_test_merged_tif" / f"{test_date}.tif"
geojson_path = project_root / "laravel_app" / "storage" / "app" / project / "Data" / "pivot_2.geojson"
output_dir = project_root / "laravel_app" / "storage" / "app" / project / "omnicloudmask_results"
output_dir.mkdir(exist_ok=True, parents=True)
print(f"\nInput image: {planetscope_image}")
print(f"Field boundaries: {geojson_path}")
print(f"Output directory: {output_dir}")
# Check files exist
if not planetscope_image.exists():
print(f"\n❌ ERROR: Image not found: {planetscope_image}")
exit(1)
if not geojson_path.exists():
print(f"\n⚠️ WARNING: GeoJSON not found: {geojson_path}")
print(" Will process without field mask")
use_field_mask = False
else:
use_field_mask = True
print("\n" + "="*70)
print("STEP 1: Load PlanetScope Image")
print("="*70)
# First, check the image metadata
with rio.open(str(planetscope_image)) as src:
print(f"\nOriginal image info:")
print(f" Bands: {src.count}")
print(f" Size: {src.height} x {src.width}")
print(f" CRS: {src.crs}")
print(f" Bounds: {src.bounds}")
# PlanetScope 4-band order: Blue(1), Green(2), Red(3), NIR(4)
# OmniCloudMask needs: Red, Green, NIR
band_order = [3, 2, 4] # Red, Green, NIR
print(f"\nLoading bands in order: Red(3), Green(2), NIR(4)")
print(f"Note: Skipping resampling to preserve image data...")
# Load without resampling to avoid issues with EPSG:4326
try:
with rio.open(str(planetscope_image)) as src:
# Read the required bands (1-indexed for rasterio)
red = src.read(3)
green = src.read(2)
nir = src.read(4)
# Stack into array (bands, height, width)
rgn_data = np.stack([red, green, nir])
# Get profile for later use
profile = src.profile.copy()
profile.update(count=1) # We'll save single-band output
print(f"✓ Image loaded successfully")
print(f" Shape: {rgn_data.shape} (bands, height, width)")
print(f" Data type: {rgn_data.dtype}")
# Check if data is valid
if rgn_data.size == 0:
print(f"❌ ERROR: Image has no data!")
exit(1)
print(f" Value range: {rgn_data.min():.6f} to {rgn_data.max():.6f}")
# Check each band
print(f"\n Band statistics:")
print(f" Red (band 0): min={rgn_data[0].min():.6f}, max={rgn_data[0].max():.6f}, mean={rgn_data[0].mean():.6f}")
print(f" Green (band 1): min={rgn_data[1].min():.6f}, max={rgn_data[1].max():.6f}, mean={rgn_data[1].mean():.6f}")
print(f" NIR (band 2): min={rgn_data[2].min():.6f}, max={rgn_data[2].max():.6f}, mean={rgn_data[2].mean():.6f}")
except Exception as e:
print(f"❌ ERROR loading image: {e}")
import traceback
traceback.print_exc()
exit(1)
# Optional: Apply field mask
if use_field_mask:
print("\n" + "="*70)
print("STEP 2: Apply Field Mask (Optional)")
print("="*70)
try:
# Load field boundaries
fields_gdf = gpd.read_file(str(geojson_path))
print(f"✓ Loaded {len(fields_gdf)} field polygons")
# Create field mask
# profile['transform'] is already an Affine object from rasterio
transform = profile['transform']
field_mask = rasterize(
[(geom, 1) for geom in fields_gdf.geometry],
out_shape=(rgn_data.shape[1], rgn_data.shape[2]),
transform=transform,
fill=0,
dtype=np.uint8
)
field_pixels = np.sum(field_mask == 1)
total_pixels = field_mask.size
print(f"✓ Field mask created")
print(f" Field pixels: {field_pixels:,} ({field_pixels/total_pixels*100:.1f}%)")
print(f" Non-field pixels: {total_pixels - field_pixels:,}")
# Apply mask - set non-field pixels to 0
rgn_data_masked = rgn_data.copy()
for i in range(3): # For each band
rgn_data_masked[i][field_mask == 0] = 0
print(f"\n Masked data statistics (field pixels only):")
field_data = field_mask == 1
print(f" Red: {rgn_data_masked[0][field_data].min():.6f} to {rgn_data_masked[0][field_data].max():.6f} (mean: {rgn_data_masked[0][field_data].mean():.6f})")
print(f" Green: {rgn_data_masked[1][field_data].min():.6f} to {rgn_data_masked[1][field_data].max():.6f} (mean: {rgn_data_masked[1][field_data].mean():.6f})")
print(f" NIR: {rgn_data_masked[2][field_data].min():.6f} to {rgn_data_masked[2][field_data].max():.6f} (mean: {rgn_data_masked[2][field_data].mean():.6f})")
# Use masked data
rgn_data_to_process = rgn_data_masked
except Exception as e:
print(f"⚠️ WARNING: Could not apply field mask: {e}")
print(" Proceeding without field mask...")
use_field_mask = False
rgn_data_to_process = rgn_data
field_mask = None
else:
rgn_data_to_process = rgn_data
field_mask = None
print("\n" + "="*70)
print("STEP 3: Run OmniCloudMask")
print("="*70)
print(f"\nRunning OmniCloudMask inference...")
print(f"⏳ This may take a few minutes (especially on CPU)...")
try:
# Generate cloud and shadow mask
prediction = predict_from_array(
rgn_data_to_process,
no_data_value=0 if use_field_mask else None,
apply_no_data_mask=use_field_mask
)
print(f"✓ OmniCloudMask inference complete!")
print(f" Prediction shape: {prediction.shape}")
print(f" Unique values: {np.unique(prediction)}")
print(f" 0 = Clear, 1 = Thick Cloud, 2 = Thin Cloud, 3 = Shadow")
except Exception as e:
print(f"❌ ERROR during inference: {e}")
import traceback
traceback.print_exc()
exit(1)
print("\n" + "="*70)
print("STEP 4: Calculate Statistics")
print("="*70)
# Get classification from prediction (remove batch dimension if present)
if prediction.ndim == 3:
classification = prediction[0]
else:
classification = prediction
# Calculate statistics
if use_field_mask and field_mask is not None:
# Stats for field pixels only
field_pixels_mask = field_mask == 1
total_pixels = np.sum(field_pixels_mask)
clear_pixels = np.sum(classification[field_pixels_mask] == 0)
thick_cloud_pixels = np.sum(classification[field_pixels_mask] == 1)
thin_cloud_pixels = np.sum(classification[field_pixels_mask] == 2)
shadow_pixels = np.sum(classification[field_pixels_mask] == 3)
print(f"\n✅ Results for FIELD AREAS ONLY ({total_pixels:,} pixels):")
else:
# Stats for all pixels
total_pixels = classification.size
clear_pixels = np.sum(classification == 0)
thick_cloud_pixels = np.sum(classification == 1)
thin_cloud_pixels = np.sum(classification == 2)
shadow_pixels = np.sum(classification == 3)
print(f"\n✅ Results for ALL PIXELS ({total_pixels:,} pixels):")
print(f" Clear: {clear_pixels:>10,} ({clear_pixels/total_pixels*100:>5.1f}%)")
print(f" Thick Cloud: {thick_cloud_pixels:>10,} ({thick_cloud_pixels/total_pixels*100:>5.1f}%)")
print(f" Thin Cloud: {thin_cloud_pixels:>10,} ({thin_cloud_pixels/total_pixels*100:>5.1f}%)")
print(f" Shadow: {shadow_pixels:>10,} ({shadow_pixels/total_pixels*100:>5.1f}%)")
cloud_pixels = thick_cloud_pixels + thin_cloud_pixels
print(f"\n Total Clouds: {cloud_pixels:>9,} ({cloud_pixels/total_pixels*100:>5.1f}%)")
print(f" Total Unusable: {cloud_pixels + shadow_pixels:>7,} ({(cloud_pixels + shadow_pixels)/total_pixels*100:>5.1f}%)")
print("\n" + "="*70)
print("STEP 5: Save Results")
print("="*70)
# Save the cloud mask result
output_file = output_dir / f"omnicloudmask_{test_date}.tif"
try:
profile.update(count=1, dtype='uint8')
with rio.open(str(output_file), 'w', **profile) as dst:
dst.write(prediction.astype('uint8'))
print(f"✓ Cloud mask saved: {output_file}")
except Exception as e:
print(f"❌ ERROR saving result: {e}")
import traceback
traceback.print_exc()
# Also save a human-readable summary
summary_file = output_dir / f"omnicloudmask_{test_date}_summary.txt"
with open(summary_file, 'w') as f:
f.write(f"OmniCloudMask Results for {test_date}\n")
f.write(f"="*50 + "\n\n")
f.write(f"Input: {planetscope_image}\n")
f.write(f"Field mask applied: {use_field_mask}\n\n")
f.write(f"Classification Results:\n")
f.write(f" Total pixels analyzed: {total_pixels:,}\n")
f.write(f" Clear: {clear_pixels:>10,} ({clear_pixels/total_pixels*100:>5.1f}%)\n")
f.write(f" Thick Cloud: {thick_cloud_pixels:>10,} ({thick_cloud_pixels/total_pixels*100:>5.1f}%)\n")
f.write(f" Thin Cloud: {thin_cloud_pixels:>10,} ({thin_cloud_pixels/total_pixels*100:>5.1f}%)\n")
f.write(f" Shadow: {shadow_pixels:>10,} ({shadow_pixels/total_pixels*100:>5.1f}%)\n")
f.write(f"\n Total Unusable: {cloud_pixels + shadow_pixels:>7,} ({(cloud_pixels + shadow_pixels)/total_pixels*100:>5.1f}%)\n")
print(f"✓ Summary saved: {summary_file}")
print("\n" + "="*70)
print("✅ COMPLETE!")
print("="*70)
print(f"\nOutputs:")
print(f" Cloud mask: {output_file}")
print(f" Summary: {summary_file}")
print(f"\nYou can open the cloud mask in QGIS or other GIS software.")
print(f"Values: 0=Clear, 1=Thick Cloud, 2=Thin Cloud, 3=Shadow")