""" spectral_features.py — Extract per-field spectral indices from 4-band GeoTIFFs ================================================================================ Part of SC-161: Extract spectral indices from 4-band TIFFs (BSI, NDVI, NDWI) Band order in TIFFs: [R, G, B, NIR] (uint16) Spectral Indices: NDVI = (NIR - R) / (NIR + R) — vegetation density BSI = ((R + NIR) - (G + B)) / ((R + NIR) + (G + B)) — bare soil detection NDWI = (G - NIR) / (G + NIR) — moisture content CI_green = NIR / Green - 1 — chlorophyll index (= SmartCane CI) CI_red = NIR / Red - 1 — chlorophyll index (red-based) GNDVI = (NIR - G) / (NIR + G) — green NDVI (less saturation) SAVI = 1.5 * (NIR - R) / (NIR + R + 0.5) — soil-adjusted vegetation EVI2 = 2.5 * (NIR - R) / (NIR + 2.4*R + 1) — enhanced vegetation (2-band) """ import numpy as np import pandas as pd import geopandas as gpd import rasterio from rasterio.mask import mask as rio_mask from pathlib import Path from typing import Optional # ============================================================================= # SPECTRAL INDEX CALCULATIONS # ============================================================================= def _safe_normalized_diff(a: np.ndarray, b: np.ndarray) -> np.ndarray: """Compute (a - b) / (a + b) with safe division, masking invalid pixels.""" with np.errstate(divide="ignore", invalid="ignore"): result = (a - b) / (a + b) result[~np.isfinite(result)] = np.nan return result def compute_ndvi(red: np.ndarray, nir: np.ndarray) -> np.ndarray: """NDVI = (NIR - R) / (NIR + R)""" return _safe_normalized_diff(nir, red) def compute_bsi(red: np.ndarray, green: np.ndarray, blue: np.ndarray, nir: np.ndarray) -> np.ndarray: """BSI = ((R + NIR) - (G + B)) / ((R + NIR) + (G + B))""" numerator = (red + nir) - (green + blue) denominator = (red + nir) + (green + blue) with np.errstate(divide="ignore", invalid="ignore"): result = numerator / denominator result[~np.isfinite(result)] = np.nan return result def compute_ndwi(green: np.ndarray, nir: np.ndarray) -> np.ndarray: """NDWI = (G - NIR) / (G + NIR)""" return _safe_normalized_diff(green, nir) def compute_ci_green(green: np.ndarray, nir: np.ndarray) -> np.ndarray: """CI_green = NIR / Green - 1 (= SmartCane pipeline CI)""" with np.errstate(divide="ignore", invalid="ignore"): result = nir / green - 1.0 result[~np.isfinite(result)] = np.nan return result def compute_ci_red(red: np.ndarray, nir: np.ndarray) -> np.ndarray: """CI_red = NIR / Red - 1 (more sensitive at low chlorophyll)""" with np.errstate(divide="ignore", invalid="ignore"): result = nir / red - 1.0 result[~np.isfinite(result)] = np.nan return result def compute_gndvi(green: np.ndarray, nir: np.ndarray) -> np.ndarray: """GNDVI = (NIR - G) / (NIR + G)""" return _safe_normalized_diff(nir, green) def compute_savi(red: np.ndarray, nir: np.ndarray, L: float = 0.5) -> np.ndarray: """SAVI = (1 + L) * (NIR - R) / (NIR + R + L), L=0.5 typical""" with np.errstate(divide="ignore", invalid="ignore"): result = (1.0 + L) * (nir - red) / (nir + red + L) result[~np.isfinite(result)] = np.nan return result def compute_evi2(red: np.ndarray, nir: np.ndarray) -> np.ndarray: """EVI2 = 2.5 * (NIR - R) / (NIR + 2.4*R + 1)""" with np.errstate(divide="ignore", invalid="ignore"): result = 2.5 * (nir - red) / (nir + 2.4 * red + 1.0) result[~np.isfinite(result)] = np.nan return result # ============================================================================= # PER-FIELD EXTRACTION # ============================================================================= def extract_field_spectral_indices( tif_path: str | Path, field_boundaries_gdf: gpd.GeoDataFrame, field_id_col: str = "field", nodata_threshold: float = 0.0, min_valid_fraction: float = 0.1, ) -> pd.DataFrame: """ Extract mean/std spectral indices per field from a single 4-band TIF. Parameters ---------- tif_path : path to 4-band GeoTIFF (R, G, B, NIR — uint16) field_boundaries_gdf : GeoDataFrame with field polygons field_id_col : column name for field identifier nodata_threshold : pixels with ALL bands <= this value are treated as nodata min_valid_fraction : minimum fraction of valid pixels required (else NaN) Returns ------- DataFrame with columns: field, mean_ndvi, mean_bsi, mean_ndwi, mean_ci_green, mean_ci_red, mean_gndvi, mean_savi, mean_evi2, std_ndvi, std_bsi, std_ndwi, std_ci_green, std_ci_red, std_gndvi, std_savi, std_evi2, valid_pixel_count, total_pixel_count """ tif_path = Path(tif_path) rows = [] with rasterio.open(tif_path) as src: tif_crs = src.crs # Reproject field boundaries to match TIF CRS if needed gdf = field_boundaries_gdf.copy() if gdf.crs is not None and not gdf.crs.equals(tif_crs): gdf = gdf.to_crs(tif_crs) elif gdf.crs is None: # Assume EPSG:4326 if not set gdf = gdf.set_crs("EPSG:4326").to_crs(tif_crs) for _, row_geom in gdf.iterrows(): field_id = row_geom[field_id_col] geom = [row_geom.geometry.__geo_interface__] try: out_image, _ = rio_mask(src, geom, crop=True, all_touched=True, nodata=0) except Exception: # Field doesn't overlap with this TIF rows.append(_make_nan_row(field_id)) continue # out_image shape: (bands, height, width) — convert to float bands = out_image.astype(np.float64) red = bands[0] green = bands[1] blue = bands[2] nir = bands[3] # Mask: valid pixels have at least one band > nodata_threshold valid_mask = np.any(bands > nodata_threshold, axis=0) total_pixels = valid_mask.size valid_pixels = int(valid_mask.sum()) if valid_pixels < max(1, int(total_pixels * min_valid_fraction)): rows.append(_make_nan_row(field_id, valid_pixels, total_pixels)) continue # Compute indices (only on valid pixels) ndvi = compute_ndvi(red, nir) bsi = compute_bsi(red, green, blue, nir) ndwi = compute_ndwi(green, nir) ci_green = compute_ci_green(green, nir) ci_red = compute_ci_red(red, nir) gndvi = compute_gndvi(green, nir) savi = compute_savi(red, nir) evi2 = compute_evi2(red, nir) # Apply mask ndvi_valid = ndvi[valid_mask] bsi_valid = bsi[valid_mask] ndwi_valid = ndwi[valid_mask] ci_green_valid = ci_green[valid_mask] ci_red_valid = ci_red[valid_mask] gndvi_valid = gndvi[valid_mask] savi_valid = savi[valid_mask] evi2_valid = evi2[valid_mask] rows.append({ "field": field_id, "mean_ndvi": np.nanmean(ndvi_valid), "mean_bsi": np.nanmean(bsi_valid), "mean_ndwi": np.nanmean(ndwi_valid), "mean_ci_green": np.nanmean(ci_green_valid), "mean_ci_red": np.nanmean(ci_red_valid), "mean_gndvi": np.nanmean(gndvi_valid), "mean_savi": np.nanmean(savi_valid), "mean_evi2": np.nanmean(evi2_valid), "std_ndvi": np.nanstd(ndvi_valid), "std_bsi": np.nanstd(bsi_valid), "std_ndwi": np.nanstd(ndwi_valid), "std_ci_green": np.nanstd(ci_green_valid), "std_ci_red": np.nanstd(ci_red_valid), "std_gndvi": np.nanstd(gndvi_valid), "std_savi": np.nanstd(savi_valid), "std_evi2": np.nanstd(evi2_valid), "valid_pixel_count": valid_pixels, "total_pixel_count": total_pixels, }) return pd.DataFrame(rows) def _make_nan_row( field_id: str, valid_pixels: int = 0, total_pixels: int = 0, ) -> dict: """Return a row with NaN values for a field that couldn't be processed.""" return { "field": field_id, "mean_ndvi": np.nan, "mean_bsi": np.nan, "mean_ndwi": np.nan, "mean_ci_green": np.nan, "mean_ci_red": np.nan, "mean_gndvi": np.nan, "mean_savi": np.nan, "mean_evi2": np.nan, "std_ndvi": np.nan, "std_bsi": np.nan, "std_ndwi": np.nan, "std_ci_green": np.nan, "std_ci_red": np.nan, "std_gndvi": np.nan, "std_savi": np.nan, "std_evi2": np.nan, "valid_pixel_count": valid_pixels, "total_pixel_count": total_pixels, }