""" Simple OmniCloudMask test script for PlanetScope imagery Based on: https://dpird-dma.github.io/blog/Cloud-Masking-for-PlanetScope-Imagery-Using-OmniCloudMask/ Tests OmniCloudMask on 2024-12-30 ESA image """ from omnicloudmask import predict_from_array, load_multiband from functools import partial from pathlib import Path import rasterio as rio import numpy as np import geopandas as gpd from rasterio.features import rasterize from rasterio.transform import Affine print("="*70) print("OMNICLOUDMASK TEST - ESA PROJECT") print("="*70) # Configuration project = 'esa' test_date = '2024-12-03' # Get absolute path to the project root (go up one level from python_app/) project_root = Path(__file__).resolve().parent.parent planetscope_image = project_root / "laravel_app" / "storage" / "app" / project / "cloud_test_merged_tif" / f"{test_date}.tif" geojson_path = project_root / "laravel_app" / "storage" / "app" / project / "Data" / "pivot_2.geojson" output_dir = project_root / "laravel_app" / "storage" / "app" / project / "omnicloudmask_results" output_dir.mkdir(exist_ok=True, parents=True) print(f"\nInput image: {planetscope_image}") print(f"Field boundaries: {geojson_path}") print(f"Output directory: {output_dir}") # Check files exist if not planetscope_image.exists(): print(f"\n❌ ERROR: Image not found: {planetscope_image}") exit(1) if not geojson_path.exists(): print(f"\n⚠️ WARNING: GeoJSON not found: {geojson_path}") print(" Will process without field mask") use_field_mask = False else: use_field_mask = True print("\n" + "="*70) print("STEP 1: Load PlanetScope Image") print("="*70) # First, check the image metadata with rio.open(str(planetscope_image)) as src: print(f"\nOriginal image info:") print(f" Bands: {src.count}") print(f" Size: {src.height} x {src.width}") print(f" CRS: {src.crs}") print(f" Bounds: {src.bounds}") # PlanetScope 4-band order: Blue(1), Green(2), Red(3), NIR(4) # OmniCloudMask needs: Red, Green, NIR band_order = [3, 2, 4] # Red, Green, NIR print(f"\nLoading bands in order: Red(3), Green(2), NIR(4)") print(f"Note: Skipping resampling to preserve image data...") # Load without resampling to avoid issues with EPSG:4326 try: with rio.open(str(planetscope_image)) as src: # Read the required bands (1-indexed for rasterio) red = src.read(3) green = src.read(2) nir = src.read(4) # Stack into array (bands, height, width) rgn_data = np.stack([red, green, nir]) # Get profile for later use profile = src.profile.copy() profile.update(count=1) # We'll save single-band output print(f"✓ Image loaded successfully") print(f" Shape: {rgn_data.shape} (bands, height, width)") print(f" Data type: {rgn_data.dtype}") # Check if data is valid if rgn_data.size == 0: print(f"❌ ERROR: Image has no data!") exit(1) print(f" Value range: {rgn_data.min():.6f} to {rgn_data.max():.6f}") # Check each band print(f"\n Band statistics:") print(f" Red (band 0): min={rgn_data[0].min():.6f}, max={rgn_data[0].max():.6f}, mean={rgn_data[0].mean():.6f}") print(f" Green (band 1): min={rgn_data[1].min():.6f}, max={rgn_data[1].max():.6f}, mean={rgn_data[1].mean():.6f}") print(f" NIR (band 2): min={rgn_data[2].min():.6f}, max={rgn_data[2].max():.6f}, mean={rgn_data[2].mean():.6f}") except Exception as e: print(f"❌ ERROR loading image: {e}") import traceback traceback.print_exc() exit(1) # Optional: Apply field mask if use_field_mask: print("\n" + "="*70) print("STEP 2: Apply Field Mask (Optional)") print("="*70) try: # Load field boundaries fields_gdf = gpd.read_file(str(geojson_path)) print(f"✓ Loaded {len(fields_gdf)} field polygons") # Create field mask # profile['transform'] is already an Affine object from rasterio transform = profile['transform'] field_mask = rasterize( [(geom, 1) for geom in fields_gdf.geometry], out_shape=(rgn_data.shape[1], rgn_data.shape[2]), transform=transform, fill=0, dtype=np.uint8 ) field_pixels = np.sum(field_mask == 1) total_pixels = field_mask.size print(f"✓ Field mask created") print(f" Field pixels: {field_pixels:,} ({field_pixels/total_pixels*100:.1f}%)") print(f" Non-field pixels: {total_pixels - field_pixels:,}") # Apply mask - set non-field pixels to 0 rgn_data_masked = rgn_data.copy() for i in range(3): # For each band rgn_data_masked[i][field_mask == 0] = 0 print(f"\n Masked data statistics (field pixels only):") field_data = field_mask == 1 print(f" Red: {rgn_data_masked[0][field_data].min():.6f} to {rgn_data_masked[0][field_data].max():.6f} (mean: {rgn_data_masked[0][field_data].mean():.6f})") print(f" Green: {rgn_data_masked[1][field_data].min():.6f} to {rgn_data_masked[1][field_data].max():.6f} (mean: {rgn_data_masked[1][field_data].mean():.6f})") print(f" NIR: {rgn_data_masked[2][field_data].min():.6f} to {rgn_data_masked[2][field_data].max():.6f} (mean: {rgn_data_masked[2][field_data].mean():.6f})") # Use masked data rgn_data_to_process = rgn_data_masked except Exception as e: print(f"⚠️ WARNING: Could not apply field mask: {e}") print(" Proceeding without field mask...") use_field_mask = False rgn_data_to_process = rgn_data field_mask = None else: rgn_data_to_process = rgn_data field_mask = None print("\n" + "="*70) print("STEP 3: Run OmniCloudMask") print("="*70) print(f"\nRunning OmniCloudMask inference...") print(f"⏳ This may take a few minutes (especially on CPU)...") try: # Generate cloud and shadow mask prediction = predict_from_array( rgn_data_to_process, no_data_value=0 if use_field_mask else None, apply_no_data_mask=use_field_mask ) print(f"✓ OmniCloudMask inference complete!") print(f" Prediction shape: {prediction.shape}") print(f" Unique values: {np.unique(prediction)}") print(f" 0 = Clear, 1 = Thick Cloud, 2 = Thin Cloud, 3 = Shadow") except Exception as e: print(f"❌ ERROR during inference: {e}") import traceback traceback.print_exc() exit(1) print("\n" + "="*70) print("STEP 4: Calculate Statistics") print("="*70) # Get classification from prediction (remove batch dimension if present) if prediction.ndim == 3: classification = prediction[0] else: classification = prediction # Calculate statistics if use_field_mask and field_mask is not None: # Stats for field pixels only field_pixels_mask = field_mask == 1 total_pixels = np.sum(field_pixels_mask) clear_pixels = np.sum(classification[field_pixels_mask] == 0) thick_cloud_pixels = np.sum(classification[field_pixels_mask] == 1) thin_cloud_pixels = np.sum(classification[field_pixels_mask] == 2) shadow_pixels = np.sum(classification[field_pixels_mask] == 3) print(f"\n✅ Results for FIELD AREAS ONLY ({total_pixels:,} pixels):") else: # Stats for all pixels total_pixels = classification.size clear_pixels = np.sum(classification == 0) thick_cloud_pixels = np.sum(classification == 1) thin_cloud_pixels = np.sum(classification == 2) shadow_pixels = np.sum(classification == 3) print(f"\n✅ Results for ALL PIXELS ({total_pixels:,} pixels):") print(f" Clear: {clear_pixels:>10,} ({clear_pixels/total_pixels*100:>5.1f}%)") print(f" Thick Cloud: {thick_cloud_pixels:>10,} ({thick_cloud_pixels/total_pixels*100:>5.1f}%)") print(f" Thin Cloud: {thin_cloud_pixels:>10,} ({thin_cloud_pixels/total_pixels*100:>5.1f}%)") print(f" Shadow: {shadow_pixels:>10,} ({shadow_pixels/total_pixels*100:>5.1f}%)") cloud_pixels = thick_cloud_pixels + thin_cloud_pixels print(f"\n Total Clouds: {cloud_pixels:>9,} ({cloud_pixels/total_pixels*100:>5.1f}%)") print(f" Total Unusable: {cloud_pixels + shadow_pixels:>7,} ({(cloud_pixels + shadow_pixels)/total_pixels*100:>5.1f}%)") print("\n" + "="*70) print("STEP 5: Save Results") print("="*70) # Save the cloud mask result output_file = output_dir / f"omnicloudmask_{test_date}.tif" try: profile.update(count=1, dtype='uint8') with rio.open(str(output_file), 'w', **profile) as dst: dst.write(prediction.astype('uint8')) print(f"✓ Cloud mask saved: {output_file}") except Exception as e: print(f"❌ ERROR saving result: {e}") import traceback traceback.print_exc() # Also save a human-readable summary summary_file = output_dir / f"omnicloudmask_{test_date}_summary.txt" with open(summary_file, 'w') as f: f.write(f"OmniCloudMask Results for {test_date}\n") f.write(f"="*50 + "\n\n") f.write(f"Input: {planetscope_image}\n") f.write(f"Field mask applied: {use_field_mask}\n\n") f.write(f"Classification Results:\n") f.write(f" Total pixels analyzed: {total_pixels:,}\n") f.write(f" Clear: {clear_pixels:>10,} ({clear_pixels/total_pixels*100:>5.1f}%)\n") f.write(f" Thick Cloud: {thick_cloud_pixels:>10,} ({thick_cloud_pixels/total_pixels*100:>5.1f}%)\n") f.write(f" Thin Cloud: {thin_cloud_pixels:>10,} ({thin_cloud_pixels/total_pixels*100:>5.1f}%)\n") f.write(f" Shadow: {shadow_pixels:>10,} ({shadow_pixels/total_pixels*100:>5.1f}%)\n") f.write(f"\n Total Unusable: {cloud_pixels + shadow_pixels:>7,} ({(cloud_pixels + shadow_pixels)/total_pixels*100:>5.1f}%)\n") print(f"✓ Summary saved: {summary_file}") print("\n" + "="*70) print("✅ COMPLETE!") print("="*70) print(f"\nOutputs:") print(f" Cloud mask: {output_file}") print(f" Summary: {summary_file}") print(f"\nYou can open the cloud mask in QGIS or other GIS software.") print(f"Values: 0=Clear, 1=Thick Cloud, 2=Thin Cloud, 3=Shadow")