SmartCane/python_app/download_planet_missing_dates.py

542 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Script: download_planet_missing_dates.py
Purpose: Download Planet satellite data for missing dates only (skip existing files).
Can be called from batch scripts or other Python scripts.
Usage:
python download_planet_missing_dates.py --start 2026-01-17 --end 2026-12-20 --project angata
python download_planet_missing_dates.py --start 2023-06-01 --end 2023-06-30 --project angata --dry-run
Environment variables (alternative to CLI args):
DAYS: Number of days to download (default: 365)
DATE: End date in YYYY-MM-DD format (default: today)
PROJECT_DIR: Project name (default: angata)
"""
import os
import sys
import json
import datetime
import argparse
import subprocess
from pathlib import Path
from osgeo import gdal
import time
import shutil
import warnings
import numpy as np
import geopandas as gpd
from shapely.geometry import MultiPolygon, Polygon, MultiLineString, box
from shapely.ops import unary_union
# Suppress GDAL TIFF metadata warnings (9-band files trigger false positives)
warnings.filterwarnings('ignore', message='.*TIFFReadDirectory.*SamplesPerPixel.*')
from sentinelhub import (
MimeType, CRS, BBox, SentinelHubRequest, SentinelHubDownloadClient,
DataCollection, bbox_to_dimensions, SHConfig, BBoxSplitter, Geometry, SentinelHubCatalog
)
# ============================================================================
# CONFIGURATION
# ============================================================================
def get_config():
"""Parse command line arguments and environment variables."""
parser = argparse.ArgumentParser(description="Download Planet satellite data for missing dates")
parser.add_argument('--start', type=str, help='Start date (YYYY-MM-DD)', default=None)
parser.add_argument('--end', type=str, help='End date (YYYY-MM-DD)', default=None)
parser.add_argument('--project', type=str, default=os.getenv('PROJECT_DIR', 'angata'),
help='Project name (default: angata)')
parser.add_argument('--resolution', type=int, default=3, help='Resolution in meters')
parser.add_argument('--days', type=int, default=365, help='Days to download (if --start not specified)')
parser.add_argument('--dry-run', action='store_true', help='Show what would be downloaded without downloading')
args = parser.parse_args()
# Determine date range
if args.end:
end_date = datetime.datetime.strptime(args.end, "%Y-%m-%d").date()
else:
end_date = datetime.date.today()
if args.start:
start_date = datetime.datetime.strptime(args.start, "%Y-%m-%d").date()
else:
start_date = end_date - datetime.timedelta(days=args.days - 1)
return {
'start_date': start_date,
'end_date': end_date,
'project': args.project,
'resolution': args.resolution,
'dry_run': args.dry_run
}
# ============================================================================
# SETUP
# ============================================================================
config = SHConfig()
config.sh_client_id = '1a72d811-4f0e-4447-8282-df09608cff44'
config.sh_client_secret = 'FcBlRL29i9ZmTzhmKTv1etSMFs5PxSos'
catalog = SentinelHubCatalog(config=config)
collection_id = '4e56d0cb-c402-40ff-97bb-c2b9e6bfcf2a'
byoc = DataCollection.define_byoc(
collection_id,
name='planet_data_8b',
is_timeless=True
)
# ============================================================================
# FUNCTIONS
# ============================================================================
def setup_paths(project):
"""Create and return folder paths."""
BASE_PATH = Path('../laravel_app/storage/app') / project
BASE_PATH_SINGLE_IMAGES = Path(BASE_PATH / 'single_images')
folder_for_merged_tifs = str(BASE_PATH / 'merged_tif')
folder_for_virtual_raster = str(BASE_PATH / 'merged_virtual')
geojson_file = Path(BASE_PATH / 'Data' / 'pivot.geojson')
# Create folders if missing
for folder in [BASE_PATH_SINGLE_IMAGES, folder_for_merged_tifs, folder_for_virtual_raster]:
Path(folder).mkdir(parents=True, exist_ok=True)
return {
'base': BASE_PATH,
'single_images': BASE_PATH_SINGLE_IMAGES,
'merged_tifs': folder_for_merged_tifs,
'virtual_raster': folder_for_virtual_raster,
'geojson': geojson_file
}
def get_existing_dates(merged_tifs_folder):
"""Get list of dates that already have merged TIF files."""
merged_tifs_path = Path(merged_tifs_folder)
if not merged_tifs_path.exists():
return set()
existing_dates = set()
for tif_file in merged_tifs_path.glob('*.tif'):
# Filename format: YYYY-MM-DD.tif
date_str = tif_file.stem
try:
datetime.datetime.strptime(date_str, "%Y-%m-%d")
existing_dates.add(date_str)
except ValueError:
pass # Ignore files that don't match date format
return existing_dates
def get_missing_dates(start_date, end_date, existing_dates):
"""Generate list of missing dates to download."""
current_date = start_date
missing_dates = []
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
if date_str not in existing_dates:
missing_dates.append(date_str)
current_date += datetime.timedelta(days=1)
return missing_dates
def setup_bbox_list_clustered(geojson_file, resolution=3, max_pixels=2500):
"""
Load field geometries and create clustered BBox list.
Instead of a uniform grid over the entire area, this creates bboxes ONLY around
field clusters, eliminating PU waste on empty space between scattered fields.
Args:
geojson_file: Path to pivot.geojson
resolution: Resolution in meters
max_pixels: Max image dimension (SentinelHub limit)
Returns:
List of BBox objects covering field clusters
"""
try:
geo_json = gpd.read_file(str(geojson_file))
except Exception as e:
print(f"ERROR: Failed to load GeoJSON: {e}")
return None
geometries = geo_json.geometry.tolist()
# Step 1: Cluster fields by proximity (tight threshold for small, efficient clusters)
clusters = cluster_fields_by_proximity(geometries, threshold_km=1)
print(f"\n✓ Detected {len(clusters)} field cluster(s)")
# Step 2: Create bbox for each cluster (no buffer - will mosaic daily images anyway)
bbox_list = []
max_size_m = max_pixels * resolution
for i, cluster_geoms in enumerate(clusters, 1):
# Get cluster bounds (tight around actual fields)
cluster_union = unary_union(cluster_geoms)
bounds = cluster_union.bounds # (minx, miny, maxx, maxy)
minx, miny, maxx, maxy = bounds
# Check size and split if needed
width_m = (maxx - minx) * 111320
height_m = (maxy - miny) * 111320
if width_m <= max_size_m and height_m <= max_size_m:
# Single bbox for this cluster
bbox = BBox(bbox=[minx, miny, maxx, maxy], crs=CRS.WGS84)
bbox_list.append(bbox)
print(f" Cluster {i}: {len(cluster_geoms)} field(s) → 1 bbox ({width_m:.0f}m × {height_m:.0f}m)")
else:
# Need to split this large cluster
sub_grid = calculate_dynamic_grid(cluster_geoms, resolution=resolution)
sub_splitter = BBoxSplitter(cluster_geoms, CRS.WGS84, sub_grid, reduce_bbox_sizes=True)
sub_bboxes = sub_splitter.get_bbox_list()
bbox_list.extend(sub_bboxes)
print(f" Cluster {i}: {len(cluster_geoms)} field(s) → {len(sub_bboxes)} bbox(es) (large cluster split)")
return bbox_list
def cluster_fields_by_proximity(geometries, threshold_km=3.0):
"""
Cluster field geometries by proximity.
Fields within `threshold_km` of each other are grouped into same cluster.
Uses a simple greedy approach:
- Start with first ungrouped field
- Find all fields within threshold
- Repeat until all grouped
Args:
geometries: List of Shapely geometries
threshold_km: Distance threshold in kilometers
Returns:
List of clusters, where each cluster is a list of geometries
"""
from scipy.spatial.distance import cdist
# Get centroids
centroids = np.array([geom.centroid.coords[0] for geom in geometries])
# Convert degrees to km (rough)
threshold_deg = threshold_km / 111.0
# Simple clustering: if distance < threshold, same cluster
clusters = []
used = set()
for i, centroid in enumerate(centroids):
if i in used:
continue
# Start new cluster with this field
cluster_indices = [i]
used.add(i)
# Find all nearby fields
for j, other_centroid in enumerate(centroids):
if j in used:
continue
dist = np.sqrt((centroid[0] - other_centroid[0])**2 +
(centroid[1] - other_centroid[1])**2)
if dist < threshold_deg:
cluster_indices.append(j)
used.add(j)
# Add this cluster
cluster_geoms = [geometries[idx] for idx in cluster_indices]
clusters.append(cluster_geoms)
return clusters
def setup_bbox_list(geojson_file, resolution=3):
"""Load field geometries and create BBox list (clustered approach)."""
return setup_bbox_list_clustered(geojson_file, resolution=resolution)
def calculate_dynamic_grid(shapely_geometries, resolution=3, max_pixels=2500):
"""Calculate optimal grid size for BBox splitting."""
flattened_geoms = []
for geom in shapely_geometries:
if isinstance(geom, MultiPolygon):
flattened_geoms.extend(list(geom.geoms))
else:
flattened_geoms.append(geom)
if len(flattened_geoms) == 1:
bounds = flattened_geoms[0].bounds
else:
multi = MultiPolygon(flattened_geoms)
bounds = multi.bounds
minx, miny, maxx, maxy = bounds
width_m = (maxx - minx) * 111320
height_m = (maxy - miny) * 111320
max_size_m = max_pixels * resolution
nx = max(1, int(np.ceil(width_m / max_size_m)))
ny = max(1, int(np.ceil(height_m / max_size_m)))
return (nx, ny)
def is_image_available(slot, bbox_list, collection_id):
"""Check if Planet imagery is available for the given date."""
try:
test_bbox = bbox_list[0] if bbox_list else None
if test_bbox is None:
return True
search_results = catalog.search(
collection=DataCollection.define_byoc(collection_id),
bbox=test_bbox,
time=(slot, slot),
filter=None
)
tiles = list(search_results)
available = len(tiles) > 0
if available:
print(f" ✓ Imagery available for {slot}")
else:
print(f" ✗ No imagery found for {slot}")
return available
except Exception as e:
print(f" ⚠ Error checking availability for {slot}: {e}")
return True
def download_function(slot, bbox, size, base_path_single_images, dry_run=False):
"""Download Planet imagery for a specific date and bbox."""
if dry_run:
print(f" [DRY-RUN] Would download {slot}")
return
try:
request = SentinelHubRequest(
evalscript=get_evalscript(),
input_data=[
SentinelHubRequest.input_data(
data_collection=byoc,
time_interval=(slot, slot)
)
],
responses=[
SentinelHubRequest.output_response('default', MimeType.TIFF)
],
bbox=bbox,
size=size,
config=config,
data_folder=str(base_path_single_images / slot),
)
list_of_requests = [request.download_list[0]]
# Use max_threads=1 to respect SentinelHub rate limits
data = SentinelHubDownloadClient(config=config).download(list_of_requests, max_threads=1)
print(f' ✓ Downloaded image for {slot}')
# Increase delay to 2.0s between requests to avoid rate limit warnings
time.sleep(1.0)
except Exception as e:
print(f' ✗ Error downloading {slot}: {e}')
def merge_files(slot, base_path_single_images, merged_tifs_folder, virtual_raster_folder, dry_run=False):
"""Merge downloaded tiles for a specific date."""
slot_dir = Path(base_path_single_images / slot)
file_list = [str(p) for p in slot_dir.rglob('response.tiff') if p.is_file()]
if not file_list:
print(f" ✗ No response.tiff files found for {slot}")
return False
if dry_run:
print(f" [DRY-RUN] Would merge {len(file_list)} tiles for {slot}")
return True
merged_tif_path = str(Path(merged_tifs_folder) / f"{slot}.tif")
merged_vrt_path = str(Path(virtual_raster_folder) / f"merged{slot}.vrt")
try:
vrt_all = gdal.BuildVRT(merged_vrt_path, file_list)
if vrt_all is None:
print(f" ✗ Failed to create VRT for {slot}")
return False
vrt_all = None
options = gdal.TranslateOptions(
outputType=gdal.GDT_Float32,
creationOptions=[
'COMPRESS=LZW',
'TILED=YES',
'BLOCKXSIZE=256',
'BLOCKYSIZE=256',
'NUM_THREADS=ALL_CPUS'
]
)
result = gdal.Translate(merged_tif_path, merged_vrt_path, options=options)
if result is None:
print(f" ✗ Failed to translate VRT to TIFF for {slot}")
return False
result = None
print(f" ✓ Merged {len(file_list)} tiles for {slot}")
# Clean up single images folder for this date
try:
shutil.rmtree(slot_dir)
print(f" ✓ Cleaned up single images for {slot}")
except Exception as e:
print(f" ⚠ Could not clean up {slot_dir}: {e}")
return True
except Exception as e:
print(f" ✗ Exception while processing {slot}: {e}")
return False
def get_evalscript():
"""Return Planet Scope evalscript with 8 bands + UDM1."""
return """
//VERSION=3
function setup() {
return {
input: [{
bands: ["coastal_blue", "blue", "green_i", "green", "yellow", "red", "rededge", "nir", "udm1"],
units: "DN"
}],
output: {
bands: 9,
sampleType: "FLOAT32"
}
};
}
function evaluatePixel(sample) {
var scaledCoastalBlue = 2.5 * sample.coastal_blue / 10000;
var scaledBlue = 2.5 * sample.blue / 10000;
var scaledGreenI = 2.5 * sample.green_i / 10000;
var scaledGreen = 2.5 * sample.green / 10000;
var scaledYellow = 2.5 * sample.yellow / 10000;
var scaledRed = 2.5 * sample.red / 10000;
var scaledRedEdge = 2.5 * sample.rededge / 10000;
var scaledNIR = 2.5 * sample.nir / 10000;
var udm1 = sample.udm1;
return [scaledCoastalBlue, scaledBlue, scaledGreenI, scaledGreen,
scaledYellow, scaledRed, scaledRedEdge, scaledNIR, udm1];
}
"""
# ============================================================================
# MAIN
# ============================================================================
def main():
print("="*80)
print("PLANET SATELLITE DATA DOWNLOADER - MISSING DATES ONLY")
print("Wrapper for 00_download_8band_pu_optimized.py")
print("="*80)
config_dict = get_config()
print(f"\nConfiguration:")
print(f" Start date: {config_dict['start_date']}")
print(f" End date: {config_dict['end_date']}")
print(f" Project: {config_dict['project']}")
print(f" Resolution: {config_dict['resolution']}m")
if config_dict['dry_run']:
print(f" Mode: DRY-RUN (no actual downloads)")
# Setup paths
paths = setup_paths(config_dict['project'])
print(f"\nPaths:")
print(f" Merged TIFs: {paths['merged_tifs']}")
print(f" GeoJSON: {paths['geojson']}")
# Check GeoJSON exists
if not paths['geojson'].exists():
print(f"\nERROR: GeoJSON not found at {paths['geojson']}")
return 1
# Get existing dates
print(f"\nScanning existing dates...")
existing_dates = get_existing_dates(paths['merged_tifs'])
print(f" Found {len(existing_dates)} existing dates")
# Get missing dates
print(f"\nFinding missing dates...")
missing_dates = get_missing_dates(
config_dict['start_date'],
config_dict['end_date'],
existing_dates
)
print(f" {len(missing_dates)} dates to download")
if not missing_dates:
print("\n✓ All dates already downloaded!")
return 0
# Show missing date range
if missing_dates:
print(f"\n Date range: {missing_dates[0]} to {missing_dates[-1]}")
if len(missing_dates) <= 10:
for date in missing_dates:
print(f" - {date}")
else:
for date in missing_dates[:3]:
print(f" - {date}")
print(f" ... ({len(missing_dates) - 6} more) ...")
for date in missing_dates[-3:]:
print(f" - {date}")
if config_dict['dry_run']:
print("\n[DRY-RUN] Would download above dates using 00_download_8band_pu_optimized.py")
return 0
# Download each missing date using the optimized downloader
print(f"\n{'='*80}")
print(f"Downloading missing dates using optimized script...")
print(f"{'='*80}")
success_count = 0
for i, date_str in enumerate(missing_dates, 1):
print(f"\n[{i}/{len(missing_dates)}] Downloading {date_str}...")
# Call 00_download_8band_pu_optimized.py for this date
cmd = [
sys.executable,
"00_download_8band_pu_optimized.py",
config_dict['project'],
"--date", date_str,
"--resolution", str(config_dict['resolution']),
"--cleanup"
]
try:
result = subprocess.run(cmd, check=True, capture_output=False)
success_count += 1
print(f" ✓ Successfully downloaded {date_str}")
except subprocess.CalledProcessError as e:
print(f" ✗ Failed to download {date_str}: {e}")
# Continue with next date instead of stopping
continue
# Summary
print(f"\n{'='*80}")
print(f"SUMMARY:")
print(f" Successfully processed: {success_count}/{len(missing_dates)} dates")
print(f" Output folder: {paths['merged_tifs']}")
print(f"{'='*80}")
return 0 if success_count == len(missing_dates) else 1
if __name__ == "__main__":
sys.exit(main())