- Implemented `run_spectral_extraction.py` to batch extract spectral indices (NDVI, BSI, NDWI) from 4-band TIFF files, saving results in a structured CSV format. - Created `spectral_features.py` for calculating various spectral indices, including NDVI, BSI, NDWI, CI_green, CI_red, GNDVI, SAVI, and EVI2. - Added Jupyter notebook `02_season_normalization_analysis.ipynb` for analyzing season-length normalization, comparing ESA and Angata spectral indices, and visualizing growth patterns. - Included functions for computing season age, plotting trajectories, and performing peak and amplitude analysis.
242 lines
9 KiB
Python
242 lines
9 KiB
Python
"""
|
|
spectral_features.py — Extract per-field spectral indices from 4-band GeoTIFFs
|
|
================================================================================
|
|
Part of SC-161: Extract spectral indices from 4-band TIFFs (BSI, NDVI, NDWI)
|
|
|
|
Band order in TIFFs: [R, G, B, NIR] (uint16)
|
|
|
|
Spectral Indices:
|
|
NDVI = (NIR - R) / (NIR + R) — vegetation density
|
|
BSI = ((R + NIR) - (G + B)) / ((R + NIR) + (G + B)) — bare soil detection
|
|
NDWI = (G - NIR) / (G + NIR) — moisture content
|
|
CI_green = NIR / Green - 1 — chlorophyll index (= SmartCane CI)
|
|
CI_red = NIR / Red - 1 — chlorophyll index (red-based)
|
|
GNDVI = (NIR - G) / (NIR + G) — green NDVI (less saturation)
|
|
SAVI = 1.5 * (NIR - R) / (NIR + R + 0.5) — soil-adjusted vegetation
|
|
EVI2 = 2.5 * (NIR - R) / (NIR + 2.4*R + 1) — enhanced vegetation (2-band)
|
|
"""
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import geopandas as gpd
|
|
import rasterio
|
|
from rasterio.mask import mask as rio_mask
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
# =============================================================================
|
|
# SPECTRAL INDEX CALCULATIONS
|
|
# =============================================================================
|
|
|
|
def _safe_normalized_diff(a: np.ndarray, b: np.ndarray) -> np.ndarray:
|
|
"""Compute (a - b) / (a + b) with safe division, masking invalid pixels."""
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
result = (a - b) / (a + b)
|
|
result[~np.isfinite(result)] = np.nan
|
|
return result
|
|
|
|
|
|
def compute_ndvi(red: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""NDVI = (NIR - R) / (NIR + R)"""
|
|
return _safe_normalized_diff(nir, red)
|
|
|
|
|
|
def compute_bsi(red: np.ndarray, green: np.ndarray, blue: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""BSI = ((R + NIR) - (G + B)) / ((R + NIR) + (G + B))"""
|
|
numerator = (red + nir) - (green + blue)
|
|
denominator = (red + nir) + (green + blue)
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
result = numerator / denominator
|
|
result[~np.isfinite(result)] = np.nan
|
|
return result
|
|
|
|
|
|
def compute_ndwi(green: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""NDWI = (G - NIR) / (G + NIR)"""
|
|
return _safe_normalized_diff(green, nir)
|
|
|
|
|
|
def compute_ci_green(green: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""CI_green = NIR / Green - 1 (= SmartCane pipeline CI)"""
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
result = nir / green - 1.0
|
|
result[~np.isfinite(result)] = np.nan
|
|
return result
|
|
|
|
|
|
def compute_ci_red(red: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""CI_red = NIR / Red - 1 (more sensitive at low chlorophyll)"""
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
result = nir / red - 1.0
|
|
result[~np.isfinite(result)] = np.nan
|
|
return result
|
|
|
|
|
|
def compute_gndvi(green: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""GNDVI = (NIR - G) / (NIR + G)"""
|
|
return _safe_normalized_diff(nir, green)
|
|
|
|
|
|
def compute_savi(red: np.ndarray, nir: np.ndarray, L: float = 0.5) -> np.ndarray:
|
|
"""SAVI = (1 + L) * (NIR - R) / (NIR + R + L), L=0.5 typical"""
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
result = (1.0 + L) * (nir - red) / (nir + red + L)
|
|
result[~np.isfinite(result)] = np.nan
|
|
return result
|
|
|
|
|
|
def compute_evi2(red: np.ndarray, nir: np.ndarray) -> np.ndarray:
|
|
"""EVI2 = 2.5 * (NIR - R) / (NIR + 2.4*R + 1)"""
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
result = 2.5 * (nir - red) / (nir + 2.4 * red + 1.0)
|
|
result[~np.isfinite(result)] = np.nan
|
|
return result
|
|
|
|
|
|
# =============================================================================
|
|
# PER-FIELD EXTRACTION
|
|
# =============================================================================
|
|
|
|
def extract_field_spectral_indices(
|
|
tif_path: str | Path,
|
|
field_boundaries_gdf: gpd.GeoDataFrame,
|
|
field_id_col: str = "field",
|
|
nodata_threshold: float = 0.0,
|
|
min_valid_fraction: float = 0.1,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Extract mean/std spectral indices per field from a single 4-band TIF.
|
|
|
|
Parameters
|
|
----------
|
|
tif_path : path to 4-band GeoTIFF (R, G, B, NIR — uint16)
|
|
field_boundaries_gdf : GeoDataFrame with field polygons
|
|
field_id_col : column name for field identifier
|
|
nodata_threshold : pixels with ALL bands <= this value are treated as nodata
|
|
min_valid_fraction : minimum fraction of valid pixels required (else NaN)
|
|
|
|
Returns
|
|
-------
|
|
DataFrame with columns:
|
|
field, mean_ndvi, mean_bsi, mean_ndwi, mean_ci_green, mean_ci_red,
|
|
mean_gndvi, mean_savi, mean_evi2,
|
|
std_ndvi, std_bsi, std_ndwi, std_ci_green, std_ci_red,
|
|
std_gndvi, std_savi, std_evi2,
|
|
valid_pixel_count, total_pixel_count
|
|
"""
|
|
tif_path = Path(tif_path)
|
|
rows = []
|
|
|
|
with rasterio.open(tif_path) as src:
|
|
tif_crs = src.crs
|
|
|
|
# Reproject field boundaries to match TIF CRS if needed
|
|
gdf = field_boundaries_gdf.copy()
|
|
if gdf.crs is not None and not gdf.crs.equals(tif_crs):
|
|
gdf = gdf.to_crs(tif_crs)
|
|
elif gdf.crs is None:
|
|
# Assume EPSG:4326 if not set
|
|
gdf = gdf.set_crs("EPSG:4326").to_crs(tif_crs)
|
|
|
|
for _, row_geom in gdf.iterrows():
|
|
field_id = row_geom[field_id_col]
|
|
geom = [row_geom.geometry.__geo_interface__]
|
|
|
|
try:
|
|
out_image, _ = rio_mask(src, geom, crop=True, all_touched=True, nodata=0)
|
|
except Exception:
|
|
# Field doesn't overlap with this TIF
|
|
rows.append(_make_nan_row(field_id))
|
|
continue
|
|
|
|
# out_image shape: (bands, height, width) — convert to float
|
|
bands = out_image.astype(np.float64)
|
|
|
|
red = bands[0]
|
|
green = bands[1]
|
|
blue = bands[2]
|
|
nir = bands[3]
|
|
|
|
# Mask: valid pixels have at least one band > nodata_threshold
|
|
valid_mask = np.any(bands > nodata_threshold, axis=0)
|
|
total_pixels = valid_mask.size
|
|
valid_pixels = int(valid_mask.sum())
|
|
|
|
if valid_pixels < max(1, int(total_pixels * min_valid_fraction)):
|
|
rows.append(_make_nan_row(field_id, valid_pixels, total_pixels))
|
|
continue
|
|
|
|
# Compute indices (only on valid pixels)
|
|
ndvi = compute_ndvi(red, nir)
|
|
bsi = compute_bsi(red, green, blue, nir)
|
|
ndwi = compute_ndwi(green, nir)
|
|
ci_green = compute_ci_green(green, nir)
|
|
ci_red = compute_ci_red(red, nir)
|
|
gndvi = compute_gndvi(green, nir)
|
|
savi = compute_savi(red, nir)
|
|
evi2 = compute_evi2(red, nir)
|
|
|
|
# Apply mask
|
|
ndvi_valid = ndvi[valid_mask]
|
|
bsi_valid = bsi[valid_mask]
|
|
ndwi_valid = ndwi[valid_mask]
|
|
ci_green_valid = ci_green[valid_mask]
|
|
ci_red_valid = ci_red[valid_mask]
|
|
gndvi_valid = gndvi[valid_mask]
|
|
savi_valid = savi[valid_mask]
|
|
evi2_valid = evi2[valid_mask]
|
|
|
|
rows.append({
|
|
"field": field_id,
|
|
"mean_ndvi": np.nanmean(ndvi_valid),
|
|
"mean_bsi": np.nanmean(bsi_valid),
|
|
"mean_ndwi": np.nanmean(ndwi_valid),
|
|
"mean_ci_green": np.nanmean(ci_green_valid),
|
|
"mean_ci_red": np.nanmean(ci_red_valid),
|
|
"mean_gndvi": np.nanmean(gndvi_valid),
|
|
"mean_savi": np.nanmean(savi_valid),
|
|
"mean_evi2": np.nanmean(evi2_valid),
|
|
"std_ndvi": np.nanstd(ndvi_valid),
|
|
"std_bsi": np.nanstd(bsi_valid),
|
|
"std_ndwi": np.nanstd(ndwi_valid),
|
|
"std_ci_green": np.nanstd(ci_green_valid),
|
|
"std_ci_red": np.nanstd(ci_red_valid),
|
|
"std_gndvi": np.nanstd(gndvi_valid),
|
|
"std_savi": np.nanstd(savi_valid),
|
|
"std_evi2": np.nanstd(evi2_valid),
|
|
"valid_pixel_count": valid_pixels,
|
|
"total_pixel_count": total_pixels,
|
|
})
|
|
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
def _make_nan_row(
|
|
field_id: str,
|
|
valid_pixels: int = 0,
|
|
total_pixels: int = 0,
|
|
) -> dict:
|
|
"""Return a row with NaN values for a field that couldn't be processed."""
|
|
return {
|
|
"field": field_id,
|
|
"mean_ndvi": np.nan,
|
|
"mean_bsi": np.nan,
|
|
"mean_ndwi": np.nan,
|
|
"mean_ci_green": np.nan,
|
|
"mean_ci_red": np.nan,
|
|
"mean_gndvi": np.nan,
|
|
"mean_savi": np.nan,
|
|
"mean_evi2": np.nan,
|
|
"std_ndvi": np.nan,
|
|
"std_bsi": np.nan,
|
|
"std_ndwi": np.nan,
|
|
"std_ci_green": np.nan,
|
|
"std_ci_red": np.nan,
|
|
"std_gndvi": np.nan,
|
|
"std_savi": np.nan,
|
|
"std_evi2": np.nan,
|
|
"valid_pixel_count": valid_pixels,
|
|
"total_pixel_count": total_pixels,
|
|
}
|