r""" Generate Interactive CI Dashboard for SmartCane ESA Project This script creates an interactive HTML dashboard with Folium/Leaflet showing: 1. Current RGB composite map 2. Current CI (Chlorophyll Index) map 3. Previous week CI map 4. Week-over-week CI change map The dashboard supports layer toggling, zooming, panning, and hover tooltips. Usage: python generate_interactive_ci_dashboard.py [estate_name] [current_week] [output_dir] Example: cd "c:\Users\timon\Resilience BV\4020 SCane ESA DEMO - Documenten\General\4020 SCDEMO Team\4020 TechnicalData\WP3\smartcane_v2\smartcane" ; python python_scripts/generate_interactive_ci_dashboard.py esa --current-week 43 --previous-week 42 --output-dir output """ import os import sys import json import argparse from pathlib import Path from datetime import datetime, timedelta from typing import Optional, Tuple import warnings try: import numpy as np import rasterio from rasterio.plot import show from rasterio.features import rasterize from rasterio.transform import from_bounds import folium from folium import plugins import geopandas as gpd from shapely.geometry import shape import matplotlib.pyplot as plt import matplotlib.cm as cm from matplotlib.colors import Normalize except ImportError as e: print(f"Error: Required package not found. {e}") print("Install required packages with:") print(" pip install numpy rasterio folium geopandas matplotlib") sys.exit(1) warnings.filterwarnings('ignore') class InteractiveCIDashboard: """Generate interactive CI analysis dashboard using Folium.""" def __init__(self, estate_name: str, data_dir: str, output_dir: str): """ Initialize dashboard generator. Args: estate_name: Name of estate (e.g., 'esa', 'aura', 'simba') data_dir: Base directory containing weekly mosaic data output_dir: Directory to save generated HTML dashboard """ self.estate_name = estate_name.lower() self.data_dir = Path(data_dir) self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) # Data paths self.weekly_mosaic_dir = self.data_dir / "weekly_mosaic" # Try multiple paths for field boundaries (in order of preference) self.field_boundaries_path = None possible_paths = [ self.data_dir / "Data" / "pivot.geojson", # laravel_app/storage/app/esa/Data/pivot.geojson self.data_dir / "pivot.geojson", # laravel_app/storage/app/esa/pivot.geojson self.data_dir / ".." / ".." / "pivot.geojson", # Up from esa/weekly_mosaic to smartcane/ Path(__file__).parent.parent / "r_app" / "experiments" / "pivot.geojson", # r_app/experiments/pivot.geojson Path(__file__).parent.parent.parent / "r_app" / "experiments" / "pivot.geojson", # One more level up ] print(f" Looking for pivot.geojson...") for path in possible_paths: resolved = path.resolve() print(f" Checking: {resolved}") if resolved.exists(): self.field_boundaries_path = resolved print(f" ✓ Found at: {resolved}") break if self.field_boundaries_path is None: print(f" ⚠ WARNING: pivot.geojson not found in any expected location") # Validate directories if not self.weekly_mosaic_dir.exists(): raise FileNotFoundError(f"Weekly mosaic directory not found: {self.weekly_mosaic_dir}") self.field_boundaries = None self.ci_data = {} self.rgb_data = {} self.bounds = None print(f"✓ Dashboard initialized for {self.estate_name}") print(f" Data directory: {self.data_dir}") print(f" Output directory: {self.output_dir}") def load_field_boundaries(self) -> gpd.GeoDataFrame: """Load field boundaries from GeoJSON.""" if self.field_boundaries_path is None: print(f"✗ Field boundaries path not set - file not found in initialization") return None if not self.field_boundaries_path.exists(): print(f"✗ Field boundaries file not found at {self.field_boundaries_path}") return None try: print(f"Loading field boundaries from: {self.field_boundaries_path}") gdf = gpd.read_file(str(self.field_boundaries_path)) print(f"✓ Loaded field boundaries: {len(gdf)} features") print(f" CRS: {gdf.crs}") print(f" Columns: {list(gdf.columns)}") if len(gdf) > 0: print(f" First feature: {gdf.iloc[0]['field']} / {gdf.iloc[0].get('sub_field', 'N/A')}") return gdf except Exception as e: print(f"✗ Error loading field boundaries: {e}") import traceback traceback.print_exc() return None def find_week_file(self, week: int, year: int = 2025) -> Optional[Path]: """Find the weekly mosaic file for a given week.""" filename = f"week_{week}_{year}.tif" filepath = self.weekly_mosaic_dir / filename if filepath.exists(): return filepath else: print(f"⚠ Week file not found: {filename}") return None def load_raster_bands(self, filepath: Path, bands: list) -> dict: """ Load specific bands from a raster file. Args: filepath: Path to raster file bands: List of band names to extract (e.g., ['Red', 'Green', 'Blue', 'CI']) Returns: Dictionary with band names as keys and numpy arrays as values """ try: with rasterio.open(filepath) as src: # Get band indices based on names band_data = {} # Try to get band names from rasterio all_bands = [src.descriptions[i] if src.descriptions[i] else str(i+1) for i in range(src.count)] print(f" Available bands: {all_bands}") for band_name in bands: # Try to find band by name try: if band_name in all_bands: idx = all_bands.index(band_name) + 1 else: # Try by index if name not found idx = int(band_name) if band_name.isdigit() else None if idx is None: print(f" ⚠ Band '{band_name}' not found, skipping") continue band_data[band_name] = src.read(idx) except Exception as e: print(f" ⚠ Error reading band '{band_name}': {e}") # Store metadata self.bounds = src.bounds self.crs = src.crs self.transform = src.transform return band_data except Exception as e: print(f"✗ Error loading raster {filepath}: {e}") return {} def normalize_raster(self, data: np.ndarray, vmin: Optional[float] = None, vmax: Optional[float] = None, mask_invalid: bool = True) -> np.ndarray: """ Normalize raster data to 0-255 range for visualization. Args: data: Numpy array vmin: Minimum value for normalization vmax: Maximum value for normalization mask_invalid: If True, set invalid values to 0 (will be transparent in image) Returns: Normalized array (0-255) """ # Create mask for invalid values invalid_mask = ~np.isfinite(data) # Remove invalid values for statistics valid_data = data[~invalid_mask] if len(valid_data) == 0: return np.zeros_like(data, dtype=np.uint8) if vmin is None: vmin = np.percentile(valid_data, 2) if vmax is None: vmax = np.percentile(valid_data, 98) # Normalize normalized = np.clip((data - vmin) / (vmax - vmin + 1e-8) * 255, 0, 255) # Set invalid values to 0 (will be transparent when converted to RGBA) if mask_invalid: normalized[invalid_mask] = 0 return normalized.astype(np.uint8) def create_rgb_composite(self, r: np.ndarray, g: np.ndarray, b: np.ndarray) -> np.ndarray: """Create RGB composite from individual bands with transparency for NA values.""" # Create mask for invalid values (NA values in any band) invalid_mask = ~(np.isfinite(r) & np.isfinite(g) & np.isfinite(b)) # Normalize each band r_norm = self.normalize_raster(r, vmin=10, vmax=150, mask_invalid=False) g_norm = self.normalize_raster(g, vmin=10, vmax=130, mask_invalid=False) b_norm = self.normalize_raster(b, vmin=3, vmax=100, mask_invalid=False) # Stack bands and add alpha channel rgb = np.dstack([r_norm, g_norm, b_norm]) # Create RGBA with transparency for NA values rgba = np.dstack([rgb, np.ones((*rgb.shape[:2], 1), dtype=np.uint8) * 255]) rgba[invalid_mask, 3] = 0 # Set alpha to 0 (transparent) for NA values return rgba.astype(np.uint8) def raster_to_image(self, data: np.ndarray, colormap: str = 'viridis') -> np.ndarray: """ Convert single-band raster to RGBA using colormap with transparency for NA values. Args: data: Single-band numpy array colormap: Matplotlib colormap name Returns: RGBA image (H x W x 4) with alpha channel for NA masking """ # Create mask for invalid values invalid_mask = ~np.isfinite(data) # Get statistics from valid data only valid_data = data[~invalid_mask] if len(valid_data) == 0: return np.ones((*data.shape, 4), dtype=np.uint8) * 255 vmin = np.percentile(valid_data, 2) vmax = np.percentile(valid_data, 98) # Normalize to 0-1 range normalized = np.clip((data - vmin) / (vmax - vmin + 1e-8), 0, 1) # Apply colormap cmap = cm.get_cmap(colormap) colored = cmap(normalized) # Convert to 8-bit RGBA rgba = (colored * 255).astype(np.uint8) # Set alpha to 0 (transparent) for NA values rgba[invalid_mask, 3] = 0 return rgba def create_raster_image_url(self, data: np.ndarray, colormap: str = 'viridis', fmt: str = 'png') -> str: """ Convert numpy array to base64 encoded image URL for Folium overlay. Handles RGBA with transparency for NA masking. Args: data: Raster data colormap: Colormap name fmt: Image format ('png', 'jpeg') Returns: Base64 encoded data URL """ import io import base64 from PIL import Image # Convert to RGBA if data.ndim == 3: if data.shape[2] == 4: # Already RGBA rgba_data = data elif data.shape[2] == 3: # RGB - add alpha channel alpha = np.ones((*data.shape[:2], 1), dtype=np.uint8) * 255 rgba_data = np.dstack([data, alpha]) else: # Single band - apply colormap rgba_data = self.raster_to_image(data, colormap) else: # Single band - apply colormap rgba_data = self.raster_to_image(data, colormap) # Convert to PIL Image with RGBA mode img = Image.fromarray(rgba_data, mode='RGBA') # Encode to base64 buffer = io.BytesIO() img.save(buffer, format='PNG') # Always use PNG for transparency support buffer.seek(0) img_base64 = base64.b64encode(buffer.read()).decode() data_url = f"data:image/png;base64,{img_base64}" return data_url def load_ci_and_rgb(self, week: int, week_label: str = "current") -> bool: """ Load CI and RGB data for a given week. Args: week: Week number week_label: Label for storing data Returns: True if successful, False otherwise """ filepath = self.find_week_file(week) if filepath is None: return False print(f"Loading week {week} data...") # Load all bands bands_data = self.load_raster_bands(filepath, ['Red', 'Green', 'Blue', 'NIR', 'CI']) if not bands_data: return False # Store RGB composite if 'Red' in bands_data and 'Green' in bands_data and 'Blue' in bands_data: self.rgb_data[week_label] = self.create_rgb_composite( bands_data['Red'], bands_data['Green'], bands_data['Blue'] ) print(f" ✓ RGB composite created for {week_label}") # Store CI data if 'CI' in bands_data: self.ci_data[week_label] = bands_data['CI'] print(f" ✓ CI data loaded for {week_label}") return True def create_base_map(self, center_lat: float = -26.75, center_lon: float = 31.78, zoom_level: int = 14) -> folium.Map: """Create base Folium map with multiple tile options.""" map_obj = folium.Map( location=[center_lat, center_lon], zoom_start=zoom_level, tiles=None # Don't add default tile ) # Add multiple base layers folium.TileLayer( tiles='OpenStreetMap', name='OpenStreetMap', overlay=False, control=True ).add_to(map_obj) # Add Google Maps Satellite layer folium.TileLayer( tiles='https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={z}', attr='Google Satellite', name='Google Satellite', overlay=False, control=True ).add_to(map_obj) # Expose Leaflet map instance for custom JS (Leaflet Draw) map_obj.get_root().html.add_child( folium.Element(''' ''') ) return map_obj def add_raster_layer(self, map_obj: folium.Map, data: np.ndarray, bounds: Tuple, name: str, colormap: str = 'viridis', opacity: float = 0.8) -> folium.Map: """ Add raster data as overlay layer on Folium map. Args: map_obj: Folium map object data: Raster data (numpy array) bounds: Raster bounds (west, south, east, north) name: Layer name colormap: Colormap for visualization opacity: Layer opacity Returns: Updated map object """ try: # Convert raster to image if data.ndim == 3 and data.shape[2] == 3: # Already RGB image_url = self.create_raster_image_url(data, fmt='png') else: # Single band - apply colormap image_url = self.create_raster_image_url(data, colormap=colormap, fmt='png') # Add as image overlay folium.raster_layers.ImageOverlay( image=image_url, bounds=[[bounds.bottom, bounds.left], [bounds.top, bounds.right]], name=name, opacity=opacity, show=True ).add_to(map_obj) print(f" ✓ Added layer: {name}") return map_obj except Exception as e: print(f" ✗ Error adding raster layer '{name}': {e}") return map_obj def add_field_boundaries(self, map_obj: folium.Map, gdf: gpd.GeoDataFrame, name: str = "Field Boundaries") -> folium.Map: """Add all field boundaries as a single toggle-able layer group with field labels on hover.""" try: print(f" Creating field boundaries layer group with {len(gdf)} fields...") # Create a feature group for all field boundaries field_group = folium.FeatureGroup(name="Field Boundaries", show=True) for idx, row in gdf.iterrows(): # Get field name field_name = row.get('field', f"Field {idx}") sub_field = row.get('sub_field', '') label = f"{field_name} - {sub_field}" if sub_field else field_name # Convert geometry to GeoJSON geojson_data = json.loads(gpd.GeoSeries(row.geometry).to_json()) # Create style function with proper closure def get_style(x, field_name=field_name): return { 'color': '#333333', 'weight': 2, 'opacity': 0.8, 'fill': True, 'fillColor': '#ffffff', 'fillOpacity': 0.0, # Invisible fill, but makes hover area larger 'dashArray': '5, 5' } # Add field boundary to the feature group with better hover folium.GeoJson( geojson_data, style_function=get_style, highlight_function=lambda x: { 'fillColor': '#ffff00', 'fillOpacity': 0.1, 'weight': 3, 'color': '#ff6600' }, tooltip=folium.Tooltip(label, sticky=False), popup=folium.Popup(f'{label}', max_width=250) ).add_to(field_group) # Add the feature group to the map field_group.add_to(map_obj) print(f" ✓ Added {len(gdf)} field boundaries with hover interaction in single layer group") except Exception as e: print(f" ✗ Error adding field boundaries: {e}") import traceback traceback.print_exc() return map_obj def calculate_ci_change(self, ci_current: np.ndarray, ci_previous: np.ndarray) -> np.ndarray: """ Calculate week-over-week CI change. Only calculate change where BOTH current and previous have valid data. """ # Create mask for valid data in both weeks valid_mask = np.isfinite(ci_current) & np.isfinite(ci_previous) # Initialize result with NaN change = np.full_like(ci_current, np.nan, dtype=np.float32) # Calculate change only where both are valid change[valid_mask] = ci_current[valid_mask] - ci_previous[valid_mask] return change def add_legend_and_descriptions(self, map_obj: folium.Map, current_week: int, previous_week: int) -> folium.Map: """Add legend and layer descriptions to the map with collapsible sections.""" # Create legend HTML with collapsible sections legend_html = '''
This interactive map shows Chlorophyll Index (CI) data for sugarcane fields. CI is a vegetation index derived from satellite imagery that indicates plant health and vigor.
Use the layer selector (top right) to toggle between different map layers. Transparent areas = no data available.
White areas or holes: These are clouds or cloud shadows that could not be reliably processed. Planet satellite imagery (optical) cannot see through clouds.
Current filtering: Basic cloud filtering is applied, but some clouds may remain.
Future improvement: Advanced cloud detection (OmniCloudMask) will be integrated to improve data quality.
• Resolution: 3m pixel size (accurate to 3m × 3m area on ground)
• Frequency: Weekly composites from Planet satellite data
• Temporal lag: May be 1-2 days behind current date due to processing
• NA values: Fields outside boundary or areas with data gaps appear transparent
✅ Normal crop growth (young to mature stage)
✅ Adequate water availability (good irrigation)
✅ Sufficient nutrient availability (N, P, K)
✅ Favorable weather conditions
🔴 Factors That Decrease CI
❌ Drought stress or irrigation failure
❌ Nutrient deficiency (especially nitrogen)
❌ Disease or pest damage (rust, smut, borers)
❌ Weed competition in young fields
❌ Lodging (crop falling over)
⚡ Rapid Changes