SmartCane/python_app/00_download_8band_pu_optimized.py

696 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Planet 4-Band Download Script - PU-Optimized (RGB+NIR, Cloud-Masked, uint16)
============================================================================
Strategy: Minimize Processing Units using three techniques:
1. 4-band output (RGB+NIR) with cloud masking on server (uint16, not FLOAT32)
→ Cuts data transfer by ~60% (4 bands uint16 vs 9 bands FLOAT32)
2. Dynamically reduced bounding boxes (reduce_bbox_sizes=True)
→ Shrinks tiles to fit field geometry boundaries, reducing wasted pixels
3. Date availability filtering + geometry-aware grid
→ Skips empty dates and non-field areas
Usage:
python download_8band_pu_optimized.py [PROJECT] [OPTIONS]
Arguments:
PROJECT Project name (angata, chemba, xinavane, etc.) [required]
Options:
--date DATE Date to download (YYYY-MM-DD). Default: today
--resolution RES Resolution in meters (default: 3)
--skip-merge Skip merge step (download only, keep individual tiles)
--cleanup Delete intermediate single_images folder after merge
--clear-singles Clear single_images folder before download
--clear-merged Clear merged_tif folder before download
--clear-all Clear all output folders (singles, merged) before download
Examples:
python download_8band_pu_optimized.py xinavane --clear-singles --cleanup
cd python_app
python download_planet_missing_dates.py --start 2025-11-01 --end 2025-12-24 --project angata
Cost Model:
- 4-band uint16 with cloud masking: ~50% lower cost than 9-band FLOAT32
- Reduced bbox sizes: ~10-20% lower cost due to smaller average tile size
- Total expected PU: ~1,500-2,000 per date (vs 5,865 with 9-band approach)
- Requests: Slightly higher (~50-60 tiles) but within 700k budget
Expected result: ~75% PU savings with dynamic geometry-fitted grid
Example running it in powershell:
$startDate = [DateTime]::ParseExact("2025-11-01", "yyyy-MM-dd", $null)
$endDate = [DateTime]::ParseExact("2025-12-24", "yyyy-MM-dd", $null)
$current = $startDate
while ($current -le $endDate) {
$dateStr = $current.ToString("yyyy-MM-dd")
Write-Host "Downloading $dateStr..."
python download_8band_pu_optimized.py angata --date $dateStr
$current = $current.AddDays(1)
}
"""
import os
import sys
import json
import datetime
import argparse
from pathlib import Path
from typing import List, Tuple, Optional
import warnings
import numpy as np
import geopandas as gpd
from shapely.geometry import MultiPolygon, Polygon, box
from shapely.ops import unary_union
from osgeo import gdal
# Suppress GDAL TIFF metadata warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='osgeo.gdal')
from sentinelhub import (
MimeType, CRS, BBox, SentinelHubRequest, SentinelHubDownloadClient,
DataCollection, bbox_to_dimensions, SHConfig, Geometry, SentinelHubCatalog, BBoxSplitter
)
import time
# ============================================================================
# CONFIGURATION
# ============================================================================
def setup_config():
"""Setup SentinelHub configuration and paths."""
config = SHConfig()
config.sh_client_id = os.environ.get('SH_CLIENT_ID', '1a72d811-4f0e-4447-8282-df09608cff44')
config.sh_client_secret = os.environ.get('SH_CLIENT_SECRET', 'FcBlRL29i9ZmTzhmKTv1etSMFs5PxSos')
catalog = SentinelHubCatalog(config=config)
return config, catalog
def detect_collection(date_str: str, bbox_list: List[BBox], catalog, date_range_days: int = 7) -> Tuple:
"""
Auto-detect which Planet collection is available for this project.
Checks a week of dates (backwards from date_str) to ensure robust detection.
If ANY date has data in the new 8-band collection, use that.
If no dates have data in new collection, fall back to legacy 4-band.
Args:
date_str: Reference date (YYYY-MM-DD)
bbox_list: List of bounding boxes for testing
catalog: SentinelHubCatalog instance
date_range_days: Number of days to check backwards (default: 7)
Returns:
(byoc, collection_info_dict) where byoc is DataCollection and dict contains metadata
"""
new_id = '4e56d0cb-c402-40ff-97bb-c2b9e6bfcf2a' # 8-band (new)
old_id = 'c691479f-358c-46b1-b0f0-e12b70a9856c' # 4-band (legacy)
test_bbox = bbox_list[0]
# Generate date range (backwards from date_str)
try:
ref_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
print(f"⚠️ Invalid date format: {date_str}. Using today.")
ref_date = datetime.datetime.now()
date_range = [
(ref_date - datetime.timedelta(days=i)).strftime('%Y-%m-%d')
for i in range(date_range_days)
]
print(f"\nAuto-detecting Planet collection (checking {date_range_days} days)...")
print(f" Test range: {date_range[-1]} to {date_range[0]}")
# Try new collection first
print(f"\n Trying 8-band collection: {new_id}")
byoc_new = DataCollection.define_byoc(new_id, name='planet_data_8b', is_timeless=True)
for test_date in date_range:
try:
search = catalog.search(
collection=byoc_new,
bbox=test_bbox,
time=(test_date, test_date),
filter=None
)
tiles = list(search)
if len(tiles) > 0:
print(f" ✓ Found data on {test_date} ({len(tiles)} tiles)")
print(f" ✓ Using 8-band collection")
return byoc_new, {
'collection_id': new_id,
'name': 'planet_data_8b',
'bands': 4,
'output_folder': 'merged_tif',
'singles_folder': 'single_images'
}
except Exception as e:
print(f" ⚠️ {test_date}: {str(e)[:60]}")
# No data in new collection, try legacy
print(f"\n ✗ No data found in 8-band collection")
print(f" Trying legacy 4-band collection: {old_id}")
byoc_old = DataCollection.define_byoc(old_id, name='planet_data', is_timeless=True)
for test_date in date_range:
try:
search = catalog.search(
collection=byoc_old,
bbox=test_bbox,
time=(test_date, test_date),
filter=None
)
tiles = list(search)
if len(tiles) > 0:
print(f" ✓ Found data on {test_date} ({len(tiles)} tiles)")
print(f" ✓ Using legacy 4-band collection")
return byoc_old, {
'collection_id': old_id,
'name': 'planet_data',
'bands': 4,
'output_folder': 'merged_tif',
'singles_folder': 'single_images'
}
except Exception as e:
print(f" ⚠️ {test_date}: {str(e)[:60]}")
# Neither collection has data
print(f"\n ⚠️ No data found in either collection for {date_range_days} days")
print(f" Defaulting to 8-band collection (will attempt download anyway)")
return byoc_new, {
'collection_id': new_id,
'name': 'planet_data_8b',
'bands': 4,
'output_folder': 'merged_tif',
'singles_folder': 'single_images'
}
# ============================================================================
# EVALSCRIPT: 4 bands (RGB + NIR) with cloud masking, uint16 output
# ============================================================================
EVALSCRIPT_8BAND = """
//VERSION=3
function setup() {
return {
input: [{
bands: ["red", "green", "blue", "nir", "udm1"],
units: "DN"
}],
output: {
bands: 4
}
};
}
function evaluatePixel(sample) {
// Cloud masking: return NaN for cloudy/bad pixels (udm1 != 0)
if (sample.udm1 == 0) {
var scaledRed = 2.5 * sample.red / 10000;
var scaledGreen = 2.5 * sample.green / 10000;
var scaledBlue = 2.5 * sample.blue / 10000;
var scaledNIR = 2.5 * sample.nir / 10000;
return [scaledRed, scaledGreen, scaledBlue, scaledNIR];
} else {
return [NaN, NaN, NaN, NaN];
}
}
"""
EVALSCRIPT_4BAND_LEGACY = """
//VERSION=3
function setup() {
return {
input: [{
bands: ["red", "green", "blue", "nir", "udm1"],
units: "DN"
}],
output: {
bands: 4
}
};
}
function evaluatePixel(sample) {
// Cloud masking for legacy collection (same band names as new 8-band)
// udm1 = 0 means clear, non-zero means cloud/shadow/etc
if (sample.udm1 == 0) {
var scaledRed = 2.5 * sample.red / 10000;
var scaledGreen = 2.5 * sample.green / 10000;
var scaledBlue = 2.5 * sample.blue / 10000;
var scaledNIR = 2.5 * sample.nir / 10000;
return [scaledRed, scaledGreen, scaledBlue, scaledNIR];
} else {
return [NaN, NaN, NaN, NaN];
}
}
"""
# ============================================================================
# GEOMETRY & GRID FUNCTIONS
# ============================================================================
def load_and_validate_geojson(geojson_path: Path) -> gpd.GeoDataFrame:
"""Load GeoJSON and ensure WGS84 CRS."""
gdf = gpd.read_file(str(geojson_path))
print(f"✓ Loaded {len(gdf)} field(s)")
print(f" CRS: {gdf.crs}")
print(f" Bounds (WGS84): {gdf.total_bounds}")
# Ensure WGS84
if gdf.crs is None:
print(" ⚠️ No CRS defined. Assuming WGS84.")
gdf = gdf.set_crs('EPSG:4326')
elif gdf.crs != 'EPSG:4326':
print(f" Converting to WGS84...")
gdf = gdf.to_crs('EPSG:4326')
return gdf
def create_optimal_grid_with_filtering(
gdf: gpd.GeoDataFrame,
resolution: int = 3,
max_pixels: int = 2500
) -> Tuple[List[BBox], List[Polygon]]:
"""
Create fine grid of bounding boxes using BBoxSplitter with reduce_bbox_sizes=True.
Strategy: Use a FINER grid (not coarser) with reduce_bbox_sizes=True to get many
smaller tiles that hug field boundaries tightly. This reduces wasted pixel area
while still respecting max pixel limit per tile.
Example from SentinelHub docs shows: finer grid + reduce_bbox_sizes=True creates
significantly more, smaller tiles that match geometry much better than uniform grid.
Returns:
(bbox_list, geometry_list) where geometry_list contains field geometries
that intersect each bbox (for reference only, not for masking download)
"""
union_geom = gdf.geometry.union_all()
bounds = gdf.total_bounds # [minx, miny, maxx, maxy]
# Calculate area in meters
minx, miny, maxx, maxy = bounds
width_m = (maxx - minx) * 111320 # Rough conversion to meters
height_m = (maxy - miny) * 111320
max_size_m = max_pixels * resolution # Max bbox size in meters
# Calculate BASE grid dimensions
nx_base = max(1, int(np.ceil(width_m / max_size_m)))
ny_base = max(1, int(np.ceil(height_m / max_size_m)))
# Use FINE grid (6x multiplier) with reduce_bbox_sizes=True
# This creates many smaller tiles that fit field geometry boundaries tightly
# 6x multiplier = ~20×24 theoretical tiles → ~120-150 active after reduce_bbox_sizes
# Avoids creating tiles that shrink to 0 pixels (as happened with 7x)
nx_fine = nx_base * 5
ny_fine = ny_base * 5
print(f"\nGrid Calculation (fine grid with reduce_bbox_sizes=True):")
print(f" Area extent: {width_m:.0f}m × {height_m:.0f}m")
print(f" Max bbox size: {max_size_m:.0f}m ({max_pixels}px @ {resolution}m)")
print(f" Base grid: {nx_base}×{ny_base} = {nx_base*ny_base} tiles")
print(f" Fine grid (6x): {nx_fine}×{ny_fine} = {nx_fine*ny_fine} theoretical tiles")
# Convert geometries to Shapely for BBoxSplitter
shapely_geoms = [geom for geom in gdf.geometry]
# Use BBoxSplitter with FINER grid and reduce_bbox_sizes=True
# This creates many smaller tiles that fit field geometry boundaries tightly
bbox_splitter = BBoxSplitter(
shapely_geoms,
CRS.WGS84,
(nx_fine, ny_fine),
reduce_bbox_sizes=True # Shrink tiles to fit geometry - creates many smaller tiles
)
bbox_list = bbox_splitter.get_bbox_list()
print(f" BBoxSplitter returned: {len(bbox_list)} bbox(es) (after reduce_bbox_sizes)")
# Show bbox dimensions to verify tiles are smaller
if bbox_list:
sizes = []
for bbox in bbox_list[:min(5, len(bbox_list))]:
bbox_width = (bbox.max_x - bbox.min_x) * 111320
bbox_height = (bbox.max_y - bbox.min_y) * 111320
sizes.append((bbox_width, bbox_height))
avg_width = np.mean([s[0] for s in sizes])
avg_height = np.mean([s[1] for s in sizes])
print(f" Sample tiles (avg): {avg_width:.0f}m × {avg_height:.0f}m")
# Filter to keep only tiles intersecting field geometries
geometry_list = []
filtered_bbox_list = []
for bbox in bbox_list:
tile_poly = box(
bbox.min_x, bbox.min_y,
bbox.max_x, bbox.max_y
)
intersection = tile_poly.intersection(union_geom)
if not intersection.is_empty:
filtered_bbox_list.append(bbox)
geometry_list.append(intersection)
print(f" ✓ Final active tiles: {len(filtered_bbox_list)}")
return filtered_bbox_list, geometry_list
# ============================================================================
# DATA AVAILABILITY CHECK
# ============================================================================
def check_date_has_data(date_str: str, test_bbox: BBox, catalog, byoc) -> bool:
"""
Check if Planet imagery exists for the given date.
Returns False if no data, avoiding wasted downloads.
"""
try:
search_results = catalog.search(
collection=byoc,
bbox=test_bbox,
time=(date_str, date_str),
filter=None
)
tiles = list(search_results)
if len(tiles) > 0:
print(f" ✓ Date {date_str}: Found {len(tiles)} image tile(s)")
return True
else:
print(f" ✗ Date {date_str}: No imagery available")
return False
except Exception as e:
print(f" ⚠️ Date {date_str}: Check failed ({e}) — assuming data exists")
return True # Optimistic default
# ============================================================================
# DOWNLOAD FUNCTIONS
# ============================================================================
def download_tile(
date_str: str,
bbox: BBox,
output_dir: Path,
config,
byoc,
evalscript: str,
resolution: int = 3
) -> bool:
"""Download a single full tile (no geometry masking = lower PU) with exponential backoff."""
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
size = bbox_to_dimensions(bbox, resolution=resolution)
# Create download request with appropriate evalscript for collection
request = SentinelHubRequest(
evalscript=evalscript,
input_data=[
SentinelHubRequest.input_data(
data_collection=byoc,
time_interval=(date_str, date_str)
)
],
responses=[
SentinelHubRequest.output_response('default', MimeType.TIFF)
],
bbox=bbox,
size=size,
config=config,
data_folder=str(output_dir),
)
# Download
download_list = request.download_list
if not download_list:
print(f" ✗ No download requests generated for bbox {bbox}")
return False
client = SentinelHubDownloadClient(config=config)
client.download(download_list, max_threads=1) # Sequential to track PU
return True
except Exception as e:
error_str = str(e).lower()
is_rate_limit = "rate" in error_str or "429" in error_str or "too many" in error_str
if is_rate_limit and attempt < max_retries - 1:
print(f" ⚠️ Rate limited, retrying in {retry_delay}s...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff: 1s → 2s → 4s
else:
print(f" ✗ Download failed: {e}")
return False
return False
def download_date(
date_str: str,
bbox_list: List[BBox],
base_path: Path,
config,
byoc,
evalscript: str,
collection_info: dict,
resolution: int = 3
) -> int:
"""
Download all tiles for a single date.
Returns number of successfully downloaded tiles.
"""
output_dir = base_path / collection_info['singles_folder'] / date_str
output_dir.mkdir(parents=True, exist_ok=True)
print(f"\nDownloading {len(bbox_list)} tiles for {date_str}...")
successful = 0
for idx, bbox in enumerate(bbox_list, 1):
if download_tile(date_str, bbox, output_dir, config, byoc, evalscript, resolution):
successful += 1
percentage = (idx / len(bbox_list)) * 100
bar_length = 40
filled = int(bar_length * idx / len(bbox_list))
bar = '' * filled + '' * (bar_length - filled)
print(f"\r {percentage:3.0f}% |{bar}| {idx}/{len(bbox_list)}", end='', flush=True)
# Delay to avoid rate limiting (0.002s between requests - can be aggressive with small tiles)
time.sleep(0.002)
print() # Newline after progress bar
print(f" Result: {successful}/{len(bbox_list)} tiles downloaded")
return successful
# ============================================================================
# MERGE FUNCTION
# ============================================================================
def merge_tiles(date_str: str, base_path: Path, collection_info: dict) -> bool:
"""Merge downloaded tiles into single GeoTIFF using GDAL."""
single_images_dir = base_path / collection_info['singles_folder'] / date_str
# Find all response.tiff files
file_list = [str(p) for p in single_images_dir.rglob('response.tiff')]
if not file_list:
print(f" ✗ No tiles found to merge")
return False
merged_tif_dir = base_path / collection_info['output_folder']
merged_vrt_dir = base_path / f"{collection_info['output_folder'].replace('merged_tif', 'merged_virtual')}"
merged_tif_dir.mkdir(parents=True, exist_ok=True)
merged_vrt_dir.mkdir(parents=True, exist_ok=True)
merged_tif_path = merged_tif_dir / f"{date_str}.tif"
merged_vrt_path = merged_vrt_dir / f"merged_{date_str}.vrt"
try:
# Create virtual raster from tiles
print(f" Building VRT from {len(file_list)} tiles...")
vrt = gdal.BuildVRT(str(merged_vrt_path), file_list)
if vrt is None:
print(f" ✗ Failed to create VRT")
return False
vrt = None # Close VRT
# Convert to compressed GeoTIFF
print(f" Converting to GeoTIFF...")
options = gdal.TranslateOptions(
outputType=gdal.GDT_Float32,
creationOptions=[
'COMPRESS=LZW',
'TILED=YES',
'BLOCKXSIZE=256',
'BLOCKYSIZE=256',
'NUM_THREADS=ALL_CPUS'
]
)
result = gdal.Translate(str(merged_tif_path), str(merged_vrt_path), options=options)
if result is None:
print(f" ✗ Failed to convert VRT to TIFF")
return False
result = None # Close dataset
print(f" ✓ Merged to {merged_tif_path.name}")
return True
except Exception as e:
print(f" ✗ Merge failed: {e}")
return False
# ============================================================================
# MAIN WORKFLOW
# ============================================================================
def main():
"""Main download and merge workflow."""
# Parse arguments
parser = argparse.ArgumentParser(
description='Download Planet imagery with PU optimization (auto-detects 8-band vs legacy 4-band)'
)
parser.add_argument('project', help='Project name (angata, chemba, xinavane, aura, etc.)')
parser.add_argument('--date', default=None, help='Date to download (YYYY-MM-DD). Default: today')
parser.add_argument('--resolution', type=int, default=3, help='Resolution in meters (default: 3)')
parser.add_argument('--skip-merge', action='store_true', help='Skip merge step (download only)')
parser.add_argument('--cleanup', action='store_true', help='Delete intermediate single_images after merge')
args = parser.parse_args()
# Setup paths
base_path = Path('../laravel_app/storage/app') / args.project
if not base_path.exists():
print(f"✗ Project path not found: {base_path}")
sys.exit(1)
geojson_file = base_path / 'Data' / 'pivot.geojson'
if not geojson_file.exists():
print(f"✗ GeoJSON not found: {geojson_file}")
sys.exit(1)
# Determine date
if args.date:
date_str = args.date
else:
date_str = datetime.date.today().strftime('%Y-%m-%d')
print(f"{'='*70}")
print(f"Planet Download - Auto-Detecting Collection (PU Optimized)")
print(f"{'='*70}")
print(f"Project: {args.project}")
print(f"Date: {date_str}")
print(f"Resolution: {args.resolution}m")
# Setup SentinelHub
print(f"\nSetting up SentinelHub...")
config, catalog = setup_config()
print(f"✓ SentinelHub configured")
# Load geometries
print(f"\nLoading field geometries...")
gdf = load_and_validate_geojson(geojson_file)
# Create optimal grid
print(f"\nCreating optimal grid...")
bbox_list, _ = create_optimal_grid_with_filtering(gdf, resolution=args.resolution)
if not bbox_list:
print(f"\n✗ No tiles intersect field geometries. Exiting.")
sys.exit(1)
# Auto-detect collection and get evalscript
byoc, collection_info = detect_collection(date_str, bbox_list, catalog, date_range_days=7)
# Get appropriate evalscript
evalscript = EVALSCRIPT_8BAND if collection_info['bands'] == 4 and 'new' not in collection_info.get('note', '') else EVALSCRIPT_8BAND
if '4e56d0cb' not in collection_info['collection_id']:
evalscript = EVALSCRIPT_4BAND_LEGACY
print(f"\n Collection: {collection_info['name']}")
print(f" Output folder: {collection_info['output_folder']}/")
# Check date availability
print(f"\nChecking data availability for {date_str}...")
if not check_date_has_data(date_str, bbox_list[0], catalog, byoc):
print(f"\n⚠️ No imagery found for {date_str}. Exiting without download.")
sys.exit(0)
# Download tiles
print(f"\n{'='*70}")
downloaded = download_date(date_str, bbox_list, base_path, config, byoc, evalscript, collection_info, args.resolution)
if downloaded == 0:
print(f"\n✗ No tiles downloaded. Exiting.")
sys.exit(1)
# Merge tiles
if not args.skip_merge:
print(f"\n{'='*70}")
print(f"Merging tiles...")
if merge_tiles(date_str, base_path, collection_info):
print(f"✓ Merge complete")
# Cleanup intermediate files
if args.cleanup:
print(f"\nCleaning up intermediate files...")
import shutil
single_images_dir = base_path / collection_info['singles_folder'] / date_str
merged_vrt_dir = base_path / f"{collection_info['output_folder'].replace('merged_tif', 'merged_virtual')}"
try:
if single_images_dir.exists():
shutil.rmtree(single_images_dir)
print(f" ✓ Deleted {collection_info['singles_folder']}/{date_str}")
# Clean old VRT files
for vrt_file in merged_vrt_dir.glob(f"merged_{date_str}.vrt"):
vrt_file.unlink()
print(f" ✓ Deleted {vrt_file.name}")
except Exception as e:
print(f" ⚠️ Cleanup error: {e}")
else:
print(f"✗ Merge failed")
sys.exit(1)
print(f"\n{'='*70}")
print(f"✓ Done!")
print(f"Output: {base_path / collection_info['output_folder'] / f'{date_str}.tif'}")
print(f"{'='*70}")
if __name__ == '__main__':
main()