import ast import datetime import json import os import re from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any, Dict, Optional import ee import gradio as gr from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel from geemap.ee_tile_layers import _get_tile_url_format, _validate_palette from starlette.middleware.cors import CORSMiddleware from starlette.middleware.trustedhost import TrustedHostMiddleware from starlette.responses import JSONResponse MAX_REQUEST_BYTES = int(os.environ.get("MAX_REQUEST_BYTES", "1048576")) MAX_ASSET_ID_LENGTH = 256 MAX_VIS_PARAMS_BYTES = 4096 MIN_SCALE_METERS = 1 MAX_SCALE_METERS = 10000 MIN_DENOMINATOR = 1 MAX_DENOMINATOR = 1e12 MAX_BBOX_AREA_DEGREES = 2500 ASSET_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_./:-]{0,255}$") SAFE_EE_CONSTRUCTORS = { "FeatureCollection": ee.FeatureCollection, "Image": ee.Image, "ImageCollection": ee.ImageCollection, } DEFAULT_ALLOWED_ORIGINS = ( "https://ee.opengeos.org,http://localhost:7865,http://127.0.0.1:7865" ) DEFAULT_ALLOWED_HOSTS = "ee.opengeos.org,localhost,127.0.0.1,0.0.0.0" # # Earth Engine auth # if "EARTHENGINE_TOKEN" not in os.environ: # raise RuntimeError("EARTHENGINE_TOKEN environment variable not found") # try: # geemap.ee_initialize() # except Exception as e: # raise RuntimeError(f"Earth Engine authentication failed: {e}") def get_env_var(key: str) -> Optional[str]: """Retrieves an environment variable or Colab secret for the given key. Colab secrets have precedence over environment variables. Args: key (str): The key that's used to fetch the environment variable. Returns: Optional[str]: The retrieved key, or None if no environment variable was found. """ if not key: return None return os.environ.get(key) def ee_initialize( token_name: str = "EARTHENGINE_TOKEN", auth_mode: Optional[str] = None, auth_args: Optional[Dict[str, Any]] = None, project: Optional[str] = None, **kwargs: Any, ) -> None: """Authenticates Earth Engine and initialize an Earth Engine session. Args: token_name (str, optional): The name of the Earth Engine token. Defaults to "EARTHENGINE_TOKEN". In Colab, you can also set a secret named "EE_PROJECT_ID" to initialize Earth Engine. auth_mode (str, optional): The authentication mode, can be one of colab, notebook, localhost, or gcloud. See https://developers.google.com/earth-engine/guides/auth for more details. Defaults to None. auth_args (dict, optional): Additional authentication parameters for aa.Authenticate(). Defaults to {}. user_agent_prefix (str, optional): If set, the prefix (version-less) value used for setting the user-agent string. Defaults to "geemap". project (str, optional): The Google cloud project ID for Earth Engine. Defaults to None. kwargs (dict, optional): Additional parameters for ee.Initialize(). For example, opt_url='https://earthengine-highvolume.googleapis.com' to use the Earth Engine High-Volume platform. Defaults to {}. """ import google.oauth2.credentials # pylint: disable-next=protected-access if ee.data._get_state().credentials is not None: return if get_env_var("EE_SERVICE_ACCOUNT") is not None: key_data = get_env_var("EE_SERVICE_ACCOUNT") try: email = json.loads(key_data)["client_email"] except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON for key_data: {e}") except KeyError: raise ValueError("key_data JSON does not contain 'client_email'") credentials = ee.ServiceAccountCredentials(email=email, key_data=key_data) ee.Initialize(credentials) return ee_token = get_env_var(token_name) if ee_token is not None: stored = json.loads(ee_token) credentials = google.oauth2.credentials.Credentials( None, token_uri="https://oauth2.googleapis.com/token", client_id=stored["client_id"], client_secret=stored["client_secret"], refresh_token=stored["refresh_token"], quota_project_id=stored["project"], ) ee.Initialize(credentials=credentials, **kwargs) return if auth_mode is None: raise RuntimeError( "Earth Engine credentials are not configured. Set EE_SERVICE_ACCOUNT " "or EARTHENGINE_TOKEN." ) if auth_args is None: auth_args = {} kwargs["project"] = project or get_env_var("EE_PROJECT_ID") auth_args["auth_mode"] = auth_mode ee.Authenticate(**auth_args) ee.Initialize(**kwargs) # ---- Shared Tile Logic ---- def parse_allowed_origins() -> list[str]: """Parses CORS origins from the ALLOWED_ORIGINS environment variable. Returns: list[str]: Allowed origins for browser clients. """ origins = os.environ.get("ALLOWED_ORIGINS", DEFAULT_ALLOWED_ORIGINS) return [origin.strip() for origin in origins.split(",") if origin.strip()] def parse_allowed_hosts() -> list[str]: """Parses trusted hosts from the ALLOWED_HOSTS environment variable. Returns: list[str]: Host names accepted by the application. """ hosts = os.environ.get("ALLOWED_HOSTS", DEFAULT_ALLOWED_HOSTS) return [host.strip() for host in hosts.split(",") if host.strip()] def validate_date_range( start_date: Optional[str], end_date: Optional[str] ) -> tuple[Optional[str], Optional[str]]: """Validates optional date strings and chronological order. Args: start_date: Optional start date in YYYY-MM-DD format. end_date: Optional end date in YYYY-MM-DD format. Returns: tuple[Optional[str], Optional[str]]: The normalized start and end dates. Raises: ValueError: If a date is malformed or the range is reversed. """ parsed_start = _parse_date(start_date, "start_date") if start_date else None parsed_end = _parse_date(end_date, "end_date") if end_date else None if parsed_start and parsed_end and parsed_start > parsed_end: raise ValueError("start_date must be on or before end_date") return ( parsed_start.isoformat() if parsed_start else None, parsed_end.isoformat() if parsed_end else None, ) def _parse_date(value: str, field_name: str) -> datetime.date: """Parses a strict ISO date. Args: value: Date string to parse. field_name: Name of the field being parsed. Returns: datetime.date: Parsed date. Raises: ValueError: If the date is not formatted as YYYY-MM-DD. """ try: return datetime.date.fromisoformat(value) except ValueError as exc: raise ValueError(f"{field_name} must use YYYY-MM-DD format") from exc def validate_bbox(bbox: Optional[list[float]]) -> Optional[list[float]]: """Validates a WGS84 bounding box and limits request size. Args: bbox: Optional bounding box as [west, south, east, north]. Returns: Optional[list[float]]: Validated bounding box. Raises: ValueError: If the bounding box is malformed or too large. """ if bbox is None: return None if len(bbox) != 4: raise ValueError("bbox must be a list of 4 values: [west, south, east, north]") west, south, east, north = bbox if not (-180 <= west < east <= 180): raise ValueError( "bbox longitude bounds must satisfy -180 <= west < east <= 180" ) if not (-90 <= south < north <= 90): raise ValueError("bbox latitude bounds must satisfy -90 <= south < north <= 90") area_degrees = (east - west) * (north - south) if area_degrees > MAX_BBOX_AREA_DEGREES: raise ValueError("bbox is too large for this service") return bbox def validate_asset_id(asset_id: str) -> str: """Validates that an Earth Engine asset ID is a safe literal path. Args: asset_id: Earth Engine asset ID. Returns: str: Validated asset ID. Raises: ValueError: If the asset ID is empty, too long, or contains unsafe syntax. """ asset_id = asset_id.strip() if not asset_id: raise ValueError("asset_id is required") if len(asset_id) > MAX_ASSET_ID_LENGTH: raise ValueError("asset_id is too long") if asset_id.startswith("ee.") or not ASSET_ID_PATTERN.fullmatch(asset_id): raise ValueError("asset_id must be a literal Earth Engine asset ID") return asset_id def parse_safe_ee_expression(expression: str) -> ee.ComputedObject: """Parses a supported Earth Engine constructor expression. Args: expression: Earth Engine expression such as ee.Image("USGS/SRTMGL1_003"). Returns: ee.ComputedObject: Constructed Earth Engine object. Raises: ValueError: If the expression uses unsupported or unsafe syntax. """ try: tree = ast.parse(expression, mode="eval") except SyntaxError as exc: raise ValueError("asset_id contains an unsupported ee expression") from exc call = tree.body if not isinstance(call, ast.Call): raise ValueError("asset_id contains an unsupported ee expression") if call.keywords or len(call.args) != 1: raise ValueError("ee expressions must use one literal asset ID argument") if not isinstance(call.func, ast.Attribute): raise ValueError("asset_id contains an unsupported ee expression") if not isinstance(call.func.value, ast.Name) or call.func.value.id != "ee": raise ValueError("asset_id contains an unsupported ee expression") if call.func.attr not in SAFE_EE_CONSTRUCTORS: raise ValueError( "Only ee.Image, ee.ImageCollection, and ee.FeatureCollection are supported" ) arg = call.args[0] if not isinstance(arg, ast.Constant) or not isinstance(arg.value, str): raise ValueError("ee expressions must use a literal asset ID string") asset_id = validate_asset_id(arg.value) return SAFE_EE_CONSTRUCTORS[call.func.attr](asset_id) def get_ee_object(asset_id: str) -> ee.ComputedObject: """Gets an Earth Engine object from a literal asset ID or safe expression. Args: asset_id: Literal asset ID or supported ee constructor expression. Returns: ee.ComputedObject: Earth Engine object for the request. Raises: ValueError: If the asset ID or expression is unsupported. """ asset_id = asset_id.strip() if asset_id.startswith("ee."): return parse_safe_ee_expression(asset_id) asset_id = validate_asset_id(asset_id) data_dict = ee.data.getAsset(asset_id) data_type = data_dict["type"] if data_type == "IMAGE": return ee.Image(asset_id) if data_type == "IMAGE_COLLECTION": return ee.ImageCollection(asset_id) if data_type in ["TABLE", "TABLE_COLLECTION"]: return ee.FeatureCollection(asset_id) raise ValueError(f"Unsupported data type: {data_type}") def validate_vis_params(vis_params: Any) -> dict[str, Any]: """Normalizes and validates visualization parameters. Args: vis_params: Visualization parameters as a dict or JSON object string. Returns: dict[str, Any]: Validated visualization parameters. Raises: ValueError: If the parameters are malformed or too large. """ if vis_params is None: return {} if isinstance(vis_params, str): if len(vis_params.encode("utf-8")) > MAX_VIS_PARAMS_BYTES: raise ValueError("vis_params is too large") vis_params = vis_params.strip() or "{}" try: vis_params = json.loads(vis_params) except json.JSONDecodeError as exc: raise ValueError("vis_params must be valid JSON") from exc if not isinstance(vis_params, dict): raise ValueError("vis_params must be a JSON object") if len(json.dumps(vis_params).encode("utf-8")) > MAX_VIS_PARAMS_BYTES: raise ValueError("vis_params is too large") if "palette" in vis_params: vis_params["palette"] = _validate_palette(vis_params["palette"]) return vis_params def validate_jrc_request(req: "JRCWaterStatsRequest") -> tuple[list[float], str]: """Validates JRC water statistics parameters. Args: req: Request model for the JRC endpoint. Returns: tuple[list[float], str]: Validated bounding box and end date. Raises: ValueError: If any parameter is outside the allowed range. """ bbox = validate_bbox(req.bbox) start_date, end_date = validate_date_range( req.start_date, req.end_date or datetime.date.today().isoformat() ) if start_date is None or end_date is None: raise ValueError("start_date and end_date are required") if bbox is None: raise ValueError("bbox is required") if req.frequency not in ("month", "year"): raise ValueError("frequency must be 'month' or 'year'") if not (MIN_SCALE_METERS <= req.scale <= MAX_SCALE_METERS): raise ValueError( f"scale must be between {MIN_SCALE_METERS} and {MAX_SCALE_METERS}" ) if not (1 <= req.start_month <= 12 and 1 <= req.end_month <= 12): raise ValueError("start_month and end_month must be between 1 and 12") if req.start_month > req.end_month: raise ValueError("start_month must be less than or equal to end_month") if not (MIN_DENOMINATOR <= req.denominator <= MAX_DENOMINATOR): raise ValueError("denominator is outside the allowed range") return bbox, end_date def get_tile( asset_id: str, vis_params: Any = None, start_date: Optional[str] = None, end_date: Optional[str] = None, bbox: Optional[list[float]] = None, ) -> str: """Gets an Earth Engine tile URL for a validated asset request. Args: asset_id: Earth Engine asset ID. vis_params: Visualization parameters. start_date: Optional start date for ImageCollection filtering. end_date: Optional end date for ImageCollection filtering. bbox: Optional [west, south, east, north] bounding box. Returns: str: Tile URL or an Error-prefixed message for UI callers. """ try: start_date, end_date = validate_date_range(start_date, end_date) bbox = validate_bbox(bbox) vis_params = validate_vis_params(vis_params) ee_object = get_ee_object(asset_id) # Apply date range filtering for ImageCollections if start_date or end_date: if isinstance(ee_object, ee.ImageCollection): if start_date and end_date: ee_object = ee_object.filterDate(start_date, end_date) elif start_date: ee_object = ee_object.filterDate(start_date, "2100-01-01") elif end_date: ee_object = ee_object.filterDate("1970-01-01", end_date) else: raise ValueError( "Date filtering is only supported for ImageCollections" ) # Apply bounding box filtering if bbox: geometry = ee.Geometry.BBox(*bbox) if isinstance(ee_object, ee.ImageCollection): ee_object = ee_object.filterBounds(geometry) elif isinstance(ee_object, ee.FeatureCollection): ee_object = ee_object.filterBounds(geometry) elif isinstance(ee_object, ee.Image): ee_object = ee_object.clip(geometry) else: raise ValueError( f"Bounding box filtering not supported for {type(ee_object)}" ) url = _get_tile_url_format(ee_object, vis_params) return url except Exception as e: return f"Error: {str(e)}" @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncIterator[None]: """Initializes Earth Engine during the API server lifespan. Args: _app: FastAPI application instance. Yields: None: Control to the running ASGI application. """ ee_initialize() yield app = FastAPI(lifespan=lifespan) @app.middleware("http") async def enforce_request_size(request: Request, call_next): """Rejects requests with a body larger than MAX_REQUEST_BYTES. Args: request: Incoming HTTP request. call_next: Next handler in the ASGI middleware chain. Returns: Response: The next response or a 413 error. """ content_length = request.headers.get("content-length") if content_length is not None: try: request_bytes = int(content_length) except ValueError: return JSONResponse({"detail": "Invalid Content-Length"}, status_code=400) if request_bytes > MAX_REQUEST_BYTES: return JSONResponse( {"detail": "Request body is too large"}, status_code=413 ) return await call_next(request) app.add_middleware( CORSMiddleware, allow_origins=parse_allowed_origins(), allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["Authorization", "Content-Type"], ) app.add_middleware(TrustedHostMiddleware, allowed_hosts=parse_allowed_hosts()) class TileRequest(BaseModel): """Request model for the tile endpoint.""" asset_id: str vis_params: dict | str | None = None start_date: str | None = None end_date: str | None = None bbox: list[float] | None = None # [west, south, east, north] class JRCWaterStatsRequest(BaseModel): """Request model for JRC water statistics endpoint.""" bbox: list[float] # [west, south, east, north] scale: float = 30 start_date: str = "1984-03-16" end_date: str | None = None # defaults to today start_month: int = 1 end_month: int = 12 frequency: str = "year" # "month" or "year" denominator: float = 10000.0 # m² to hectares @app.get("/healthz") def healthz() -> dict[str, str]: """Returns a lightweight health check response. Returns: dict[str, str]: Health check status. """ return {"status": "ok"} @app.post("/tile") def get_tile_api(req: TileRequest) -> dict[str, str]: """Returns a tile URL for a supported Earth Engine asset. Args: req: Tile URL request. Returns: dict[str, str]: Tile URL response. """ result = get_tile( req.asset_id, req.vis_params, req.start_date, req.end_date, req.bbox ) if isinstance(result, str) and result.startswith("Error"): raise HTTPException(status_code=400, detail=result) return {"tile_url": result} @app.post("/jrc-water-stats") def get_jrc_water_stats(req: JRCWaterStatsRequest) -> dict[str, Any]: """Compute JRC monthly water history and water occurrence statistics. Args: req: Request with bbox, scale, date range, month range, and frequency. Returns: dict: Monthly history data and water occurrence statistics. """ try: bbox, end_date = validate_jrc_request(req) region = ee.Geometry.BBox(*bbox) # Compute monthly water history from JRC MonthlyHistory collection = ee.ImageCollection("JRC/GSW1_4/MonthlyHistory") images = ( collection.filterDate(req.start_date, end_date) .filter(ee.Filter.calendarRange(req.start_month, req.end_month, "month")) .map(lambda img: img.eq(2).selfMask()) ) def cal_area(img): """Calculates water area for a JRC monthly image. Args: img: Earth Engine image. Returns: ee.Image: Image with computed area metadata. """ pixel_area = img.multiply(ee.Image.pixelArea()).divide(req.denominator) img_area = pixel_area.reduceRegion( geometry=region, reducer=ee.Reducer.sum(), scale=req.scale, maxPixels=1e12, bestEffort=True, ) return img.set({"area": img_area}) areas = images.map(cal_area) stats_list = areas.aggregate_array("area").getInfo() values = [item["water"] for item in stats_list] labels = areas.aggregate_array("system:index").getInfo() if req.frequency == "month": history_data = [ {"Month": label, "Area": area} for label, area in zip(labels, values) ] else: # Group by year and compute mean year_areas = {} for label, area in zip(labels, values): year = label[:4] year_areas.setdefault(year, []).append(area) history_data = [ {"Year": year, "Area": sum(areas) / len(areas)} for year, areas in sorted(year_areas.items()) ] # Compute water occurrence statistics occurrence = ee.Image("JRC/GSW1_4/GlobalSurfaceWater").select("occurrence") stats = occurrence.reduceRegion( reducer=ee.Reducer.mean() .combine(ee.Reducer.min(), sharedInputs=True) .combine(ee.Reducer.max(), sharedInputs=True) .combine(ee.Reducer.stdDev(), sharedInputs=True), geometry=region, scale=req.scale, maxPixels=1e12, bestEffort=True, ).getInfo() # Compute occurrence histogram (10 bins from 0 to 100) hist_result = occurrence.reduceRegion( reducer=ee.Reducer.fixedHistogram(0, 100, 10), geometry=region, scale=req.scale, maxPixels=1e12, bestEffort=True, ).getInfo() # Parse histogram result histogram = {"bin_edges": [], "counts": []} if hist_result and "occurrence" in hist_result: hist_list = hist_result["occurrence"] bin_edges = [row[0] for row in hist_list] bin_edges.append(hist_list[-1][0] + 10) # add right edge counts = [row[1] for row in hist_list] histogram = {"bin_edges": bin_edges, "counts": counts} return { "monthly_history": { "frequency": req.frequency, "unit": "hectares", "data": history_data, }, "water_occurrence": { "stats": { "mean": stats.get("occurrence_mean"), "min": stats.get("occurrence_min"), "max": stats.get("occurrence_max"), "stdDev": stats.get("occurrence_stdDev"), }, "histogram": histogram, }, "parameters": { "bbox": bbox, "scale": req.scale, "start_date": req.start_date, "end_date": end_date, "start_month": req.start_month, "end_month": req.end_month, "frequency": req.frequency, }, } except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # ---- Gradio UI ---- def get_tile_gradio(asset_id, vis_params, start_date, end_date, bbox_str): """Wrapper for Gradio that converts string inputs to proper types. Args: asset_id: Earth Engine asset ID from the UI. vis_params: Visualization parameters as JSON. start_date: Optional start date. end_date: Optional end date. bbox_str: Optional comma-separated bounding box. Returns: str: Tile URL or validation error for the UI. """ # Convert empty strings to None start_date = start_date.strip() if start_date and start_date.strip() else None end_date = end_date.strip() if end_date and end_date.strip() else None # Convert bbox string to list bbox = None if bbox_str and bbox_str.strip(): try: bbox = [float(x.strip()) for x in bbox_str.split(",")] except ValueError: return "Error: bbox must be comma-separated numbers (west,south,east,north)" return get_tile(asset_id, vis_params, start_date, end_date, bbox) gradio_ui = gr.Interface( fn=get_tile_gradio, inputs=[ gr.Textbox(label="Earth Engine Asset ID", placeholder="e.g., USGS/SRTMGL1_003"), gr.Textbox( label="Visualization Parameters (JSON)", placeholder='{"min":0,"max":5000,"palette":"terrain"}', ), gr.Textbox( label="Start Date (Optional)", placeholder="e.g., 2023-01-01", value="", ), gr.Textbox( label="End Date (Optional)", placeholder="e.g., 2023-12-31", value="", ), gr.Textbox( label="Bounding Box (Optional)", placeholder="e.g., -122.5,37.5,-122.0,38.0 (west,south,east,north)", value="", ), ], outputs="text", title="Earth Engine Tile URL Generator", description="Supports ee.Image, ee.ImageCollection, ee.FeatureCollection with optional date range and bbox filtering. Tile URL is suitable for basemap usage.", ) app = gr.mount_gradio_app(app, gradio_ui, path="/")