""" Clean empty field-tile TIFFs and orphaned RDS files ==================================================== Scans field_tiles/ and/or field_tiles_CI/ directories and identifies TIF files where ALL pixels have RGBNIR == 0 (no satellite data collected). Partially-covered tiles (some valid pixels present) are kept. When deleting from field_tiles_CI/, also deletes the paired daily_ci_vals/{FIELD}/{DATE}.rds file if it exists. USAGE: # Dry run — list empty files (default, scans both dirs): & "C:\\Users\\timon\\anaconda3\\envs\\pytorch_gpu\\python.exe" python_app/clean_empty_tiles.py # Actually delete: & "...\\python.exe" python_app/clean_empty_tiles.py --delete # Only one directory type: & "...\\python.exe" python_app/clean_empty_tiles.py --dirs field_tiles & "...\\python.exe" python_app/clean_empty_tiles.py --dirs field_tiles_CI # Specific projects or fields: & "...\\python.exe" python_app/clean_empty_tiles.py --projects angata aura --delete & "...\\python.exe" python_app/clean_empty_tiles.py --fields 544 301 --delete """ import argparse from pathlib import Path import numpy as np import rasterio ROOT = Path(__file__).resolve().parent.parent DEFAULT_DIRS = ["field_tiles", "field_tiles_CI"] def is_empty_tif(path: Path) -> bool: """Return True if ALL pixels in RGBNIR bands are 0 or NaN (no satellite data). Cloud-masked pixels are stored as 0 in uint16 (NaN is not representable). A tile is considered empty only when every pixel across bands 1-4 is 0 or NaN, meaning no valid satellite data was captured for that field on that date. Partially-covered tiles (some pixels valid) return False and are left alone. """ try: with rasterio.open(path) as src: if src.count < 4: return False # unexpected band count — leave it alone rgbnir = src.read([1, 2, 3, 4]).astype(np.float32) except Exception as e: print(f" WARNING: could not open {path.name}: {e}") return False return bool(np.all((rgbnir == 0) | np.isnan(rgbnir))) def scan_directory(storage_root: Path, dir_name: str, delete: bool, fields: list = None) -> dict: """Scan one tile directory within a project storage root. When dir_name == 'field_tiles_CI' and delete=True, also removes the paired daily_ci_vals/{FIELD}/{DATE}.rds file for each deleted TIF. Returns: dict mapping field_id -> list of empty Path objects """ tiff_root = storage_root / dir_name # Paired RDS files only exist for field_tiles_CI output rds_root = storage_root / "daily_ci_vals" if dir_name == "field_tiles_CI" else None if not tiff_root.exists(): print(f" [{dir_name}] Directory not found: {tiff_root}") return {} field_dirs = sorted(d for d in tiff_root.iterdir() if d.is_dir()) if fields: field_dirs = [d for d in field_dirs if d.name in fields] print(f"\n [{dir_name}] Scanning {len(field_dirs)} fields ...") results = {} for field_dir in field_dirs: tif_files = sorted(field_dir.glob("*.tif")) empty = [f for f in tif_files if is_empty_tif(f)] if empty: results[field_dir.name] = empty print(f" Field {field_dir.name:>6}: {len(empty)}/{len(tif_files)} empty" f" ({', '.join(f.stem for f in empty)})") total_empty = sum(len(v) for v in results.values()) total_tifs = sum(len(list(d.glob("*.tif"))) for d in field_dirs) print(f"\n [{dir_name}] Summary: {total_empty} empty / {total_tifs} total TIFs" f" across {len(results)} fields") if delete and total_empty > 0: print(f"\n [{dir_name}] Deleting {total_empty} empty TIFs ...") rds_deleted = 0 for field_id, files in results.items(): for f in files: f.unlink() print(f" Deleted TIF: {f.relative_to(ROOT)}") # Also remove the paired RDS from daily_ci_vals/ (Script 20 output) if rds_root is not None: paired_rds = rds_root / field_id / f"{f.stem}.rds" if paired_rds.exists(): paired_rds.unlink() print(f" Deleted RDS: {paired_rds.relative_to(ROOT)}") rds_deleted += 1 print(f" [{dir_name}] Done. ({rds_deleted} paired RDS files also removed)") elif not delete and total_empty > 0: print(f"\n [{dir_name}] Dry run — pass --delete to remove these files.") return results def scan_project(project: str, delete: bool, fields: list = None, dirs: list = None) -> None: storage_root = ROOT / "laravel_app" / "storage" / "app" / project if not storage_root.exists(): print(f"[{project}] Project directory not found: {storage_root}") return print(f"\n[{project}] ========================================") for dir_name in (dirs or DEFAULT_DIRS): scan_directory(storage_root, dir_name, delete, fields) def main(): parser = argparse.ArgumentParser( description="Remove empty field-tile TIFFs and paired RDS files" ) parser.add_argument( "--delete", action="store_true", help="Actually delete empty files (default: dry run)" ) parser.add_argument( "--projects", nargs="+", default=["angata"], help="Project names to scan (default: angata)" ) parser.add_argument( "--fields", nargs="+", default=None, help="Limit to specific field IDs, e.g. --fields 544 301" ) parser.add_argument( "--dirs", nargs="+", default=None, choices=DEFAULT_DIRS, help=f"Which subdirs to scan (default: both {DEFAULT_DIRS})" ) args = parser.parse_args() print("=== Mode: DELETE ===" if args.delete else "=== Mode: DRY RUN (use --delete to remove) ===") for project in args.projects: scan_project(project, args.delete, args.fields, args.dirs) if __name__ == "__main__": main()