Spaces:
Running
Running
| import importlib | |
| import json | |
| import platform | |
| import sys | |
| import traceback | |
| import faulthandler | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timedelta | |
| print("[dmi-collector][bootstrap] importing third_party_modules", flush=True) | |
| import gradio as gr | |
| import requests | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import schedule | |
| import time | |
| import threading | |
| import os | |
| from zoneinfo import ZoneInfo | |
| print("[dmi-collector][bootstrap] third_party_modules_imported", flush=True) | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| DATASET_NAME = "Ciroc0/dmi-aarhus-weather-data" | |
| PREDICTIONS_DATASET = "Ciroc0/dmi-aarhus-predictions" | |
| AARHUS_LAT = 56.1567 | |
| AARHUS_LON = 10.2108 | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| FRONTEND_SNAPSHOT_FILE = "frontend_snapshot.json" | |
| COPENHAGEN_TZ = ZoneInfo("Europe/Copenhagen") | |
| APP_NAME = "dmi-collector" | |
| HISTORICAL_BACKFILL_START = datetime(2025, 11, 1).date() | |
| TRAINING_HOLDOUT_DAYS = 7 | |
| FUTURE_FORECAST_HOURS = 48 | |
| faulthandler.enable() | |
| MODEL_FILES = { | |
| "temperature": "temperature_models.pkl", | |
| "wind_speed": "wind_speed_models.pkl", | |
| "wind_gust": "wind_gust_models.pkl", | |
| "rain_event": "rain_event_models.pkl", | |
| "rain_amount": "rain_amount_models.pkl", | |
| } | |
| # Extended feature set from PLAN.md | |
| FORECAST_FEATURES = [ | |
| "temperature_2m", | |
| "apparent_temperature", | |
| "relative_humidity_2m", | |
| "dew_point_2m", | |
| "pressure_msl", | |
| "cloud_cover", | |
| "cloud_cover_low", | |
| "cloud_cover_mid", | |
| "cloud_cover_high", | |
| "precipitation", | |
| "rain", | |
| "snowfall", | |
| "precipitation_probability", | |
| "windspeed_10m", | |
| "winddirection_10m", | |
| "windgusts_10m", | |
| "visibility", | |
| "shortwave_radiation", | |
| "direct_radiation", | |
| "weather_code", | |
| "cape", | |
| ] | |
| OBSERVATION_FEATURES = [ | |
| "temperature_2m", | |
| "relative_humidity_2m", | |
| "pressure_msl", | |
| "precipitation", | |
| "rain", | |
| "windspeed_10m", | |
| "winddirection_10m", | |
| "windgusts_10m", | |
| ] | |
| TRAINING_DEDUP_KEYS = ["reference_time", "target_timestamp"] | |
| PREDICTION_DEDUP_KEYS = ["target_timestamp"] | |
| OBSERVATION_CONTEXT_TIMESTAMP_COL = "observation_context_timestamp" | |
| OBSERVATION_SOURCE_COLUMNS = [ | |
| "actual_temp", | |
| "actual_humidity", | |
| "actual_pressure", | |
| "actual_precipitation", | |
| "actual_rain", | |
| "actual_wind_speed", | |
| "actual_wind_direction", | |
| "actual_wind_gust", | |
| "actual_wind_u", | |
| "actual_wind_v", | |
| "rain_event", | |
| "rain_amount", | |
| ] | |
| OBSERVATION_CONTEXT_COLUMNS = [ | |
| "obs_temp_lag_1h", | |
| "obs_temp_mean_3h", | |
| "obs_temp_mean_6h", | |
| "obs_wind_lag_1h", | |
| "obs_wind_mean_3h", | |
| "obs_wind_mean_6h", | |
| "obs_wind_u_lag_1h", | |
| "obs_wind_u_mean_3h", | |
| "obs_wind_v_lag_1h", | |
| "obs_wind_v_mean_3h", | |
| "obs_pressure_lag_1h", | |
| "obs_pressure_mean_3h", | |
| "obs_humidity_lag_1h", | |
| "obs_humidity_mean_3h", | |
| "obs_precip_lag_1h", | |
| "obs_precip_sum_3h", | |
| "obs_precip_sum_6h", | |
| "obs_precip_sum_12h", | |
| ] | |
| class LazyModule: | |
| def __init__(self, module_name): | |
| self.module_name = module_name | |
| self._module = None | |
| def _load(self): | |
| if self._module is None: | |
| self._module = importlib.import_module(self.module_name) | |
| return self._module | |
| def __getattr__(self, item): | |
| return getattr(self._load(), item) | |
| pd = LazyModule("pandas") | |
| np = LazyModule("numpy") | |
| joblib = LazyModule("joblib") | |
| class AppState: | |
| lock: threading.Lock = field(default_factory=threading.Lock) | |
| warming: bool = True | |
| ready: bool = False | |
| last_error: str | None = None | |
| cache_revision: str | None = None | |
| active_job: bool = False | |
| model_bundle_cache: dict = field(default_factory=dict) | |
| catch_up_started: bool = False | |
| catch_up_completed: bool = False | |
| APP_STATE = AppState() | |
| def log_event(message, **fields): | |
| """Emit a structured log line that shows up clearly in HF runtime logs.""" | |
| timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z" | |
| details = " ".join(f"{key}={fields[key]!r}" for key in sorted(fields)) | |
| if details: | |
| print(f"[{APP_NAME}] {timestamp} {message} {details}", flush=True) | |
| else: | |
| print(f"[{APP_NAME}] {timestamp} {message}", flush=True) | |
| def log_exception(context, exc): | |
| """Log an exception with a full traceback.""" | |
| log_event(f"{context} failed", error=str(exc), error_type=type(exc).__name__) | |
| print(traceback.format_exc(), flush=True) | |
| def install_global_logging(): | |
| """Install process-level exception logging.""" | |
| def handle_exception(exc_type, exc_value, exc_traceback): | |
| if issubclass(exc_type, KeyboardInterrupt): | |
| sys.__excepthook__(exc_type, exc_value, exc_traceback) | |
| return | |
| log_event("uncaught_exception", error=str(exc_value), error_type=exc_type.__name__) | |
| print("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)), flush=True) | |
| sys.excepthook = handle_exception | |
| def log_startup(): | |
| """Log runtime context during process startup.""" | |
| log_event( | |
| "startup", | |
| python=sys.version.split()[0], | |
| platform=platform.platform(), | |
| cwd=os.getcwd(), | |
| hf_token_present=bool(HF_TOKEN), | |
| dataset=DATASET_NAME, | |
| predictions_dataset=PREDICTIONS_DATASET, | |
| ) | |
| def run_logged(name, fn, *args, **kwargs): | |
| """Run a function with start/end/error logging.""" | |
| log_event(f"{name} started") | |
| try: | |
| result = fn(*args, **kwargs) | |
| if isinstance(result, pd.DataFrame): | |
| log_event(f"{name} completed", rows=len(result), columns=list(result.columns)) | |
| else: | |
| log_event(f"{name} completed", result_type=type(result).__name__) | |
| return result | |
| except Exception as exc: | |
| log_exception(name, exc) | |
| raise | |
| install_global_logging() | |
| log_event("bootstrap_begin") | |
| def build_app_status_text(): | |
| with APP_STATE.lock: | |
| if APP_STATE.last_error: | |
| return f"Status: failed. {APP_STATE.last_error}" | |
| if APP_STATE.warming and APP_STATE.active_job: | |
| return "Status: warming up. Post-start catch-up is running in the background." | |
| if APP_STATE.warming: | |
| return "Status: warming up. The UI is live; startup catch-up will run in the background." | |
| if APP_STATE.active_job: | |
| return "Status: ready. Background maintenance is currently running." | |
| return "Status: ready. Forecast, daily update, prediction, and verification actions run on demand or by schedule." | |
| def note_app_error(exc): | |
| with APP_STATE.lock: | |
| APP_STATE.last_error = str(exc) | |
| def clear_model_bundle_cache(cache_revision=None): | |
| with APP_STATE.lock: | |
| if cache_revision is None or APP_STATE.cache_revision != cache_revision: | |
| APP_STATE.model_bundle_cache = {} | |
| APP_STATE.cache_revision = cache_revision | |
| def set_app_ready(): | |
| with APP_STATE.lock: | |
| APP_STATE.warming = False | |
| APP_STATE.ready = True | |
| APP_STATE.last_error = None | |
| def now_cph(): | |
| """Current time in Copenhagen timezone.""" | |
| return datetime.now(COPENHAGEN_TZ) | |
| def get_lead_bucket(lead_hours): | |
| """Map lead time to bucket.""" | |
| if lead_hours <= 6: | |
| return "1-6" | |
| elif lead_hours <= 12: | |
| return "7-12" | |
| elif lead_hours <= 24: | |
| return "13-24" | |
| else: | |
| return "25-48" | |
| def ensure_copenhagen_time(df, column_name): | |
| """Ensure a datetime column is timezone-aware in Europe/Copenhagen.""" | |
| if column_name not in df.columns: | |
| return df | |
| series = pd.to_datetime(df[column_name], errors="coerce") | |
| if getattr(series.dt, "tz", None) is None: | |
| df[column_name] = series.dt.tz_localize(COPENHAGEN_TZ, ambiguous="infer", nonexistent="shift_forward") | |
| else: | |
| df[column_name] = series.dt.tz_convert(COPENHAGEN_TZ) | |
| return df | |
| def dedupe_rows(df, dedup_keys, sort_keys=None, keep="last"): | |
| """Sort and drop duplicate rows using the keys available on the dataframe.""" | |
| if df is None or len(df) == 0: | |
| return df | |
| available_sort_keys = [key for key in (sort_keys or dedup_keys) if key in df.columns] | |
| if available_sort_keys: | |
| df = df.sort_values(available_sort_keys).reset_index(drop=True) | |
| available_dedup_keys = [key for key in dedup_keys if key in df.columns] | |
| if available_dedup_keys: | |
| df = df.drop_duplicates(subset=available_dedup_keys, keep=keep).reset_index(drop=True) | |
| return df | |
| def find_new_rows(candidate_df, existing_df, dedup_keys): | |
| """Return rows that are new compared with an existing dataframe.""" | |
| if existing_df is None or len(existing_df) == 0: | |
| return candidate_df | |
| keys = [key for key in dedup_keys if key in candidate_df.columns and key in existing_df.columns] | |
| if not keys: | |
| return candidate_df | |
| existing_keys = existing_df[keys].drop_duplicates().copy() | |
| existing_keys["_existing"] = True | |
| merged = candidate_df.merge(existing_keys, on=keys, how="left") | |
| merged = merged[merged["_existing"] != True].drop(columns=["_existing"]) | |
| return merged.reset_index(drop=True) | |
| def build_model_features(df): | |
| """Build the causal feature set used by both training and live inference.""" | |
| if df is None or len(df) == 0: | |
| return df | |
| features_df = df.copy() | |
| features_df = add_temporal_features(features_df) | |
| features_df = add_run_delta_features(features_df) | |
| return features_df | |
| def ensure_observation_context_columns(df): | |
| """Keep the observation-context schema stable even when context is unavailable.""" | |
| if df is None: | |
| return df | |
| if OBSERVATION_CONTEXT_TIMESTAMP_COL not in df.columns: | |
| df[OBSERVATION_CONTEXT_TIMESTAMP_COL] = pd.NaT | |
| for column_name in OBSERVATION_CONTEXT_COLUMNS: | |
| if column_name not in df.columns: | |
| df[column_name] = np.nan | |
| return df | |
| def get_optional_series(df, column_name): | |
| if column_name in df.columns: | |
| return df[column_name] | |
| return pd.Series(np.nan, index=df.index, dtype="float64") | |
| def datetime_series_to_epoch_ns(series): | |
| """Normalize mixed timestamp precisions to comparable epoch nanoseconds.""" | |
| normalized = pd.to_datetime(series, errors="coerce", utc=True) | |
| naive = normalized.dt.tz_localize(None) | |
| return naive.astype("datetime64[ns]").astype("int64", copy=False) | |
| def normalize_prediction_df(pred_df): | |
| """Normalize prediction history to the current schema.""" | |
| if pred_df is None or len(pred_df) == 0: | |
| return pred_df | |
| if "timestamp" in pred_df.columns and "target_timestamp" not in pred_df.columns: | |
| pred_df = pred_df.rename(columns={"timestamp": "target_timestamp"}) | |
| for column_name in ["target_timestamp", "reference_time", "prediction_made_at"]: | |
| pred_df = ensure_copenhagen_time(pred_df, column_name) | |
| if "target_timestamp" in pred_df.columns: | |
| pred_df = pred_df[pred_df["target_timestamp"].notna()].copy() | |
| if "verified" not in pred_df.columns: | |
| pred_df["verified"] = False | |
| pred_df["verified"] = pred_df["verified"].fillna(False).astype(bool) | |
| pred_df = dedupe_rows( | |
| pred_df, | |
| PREDICTION_DEDUP_KEYS, | |
| sort_keys=["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"], | |
| keep="last", | |
| ) | |
| return pred_df | |
| def merge_prediction_history(existing_df, new_df): | |
| """Upsert future rows while preserving historical prediction rows.""" | |
| if existing_df is None or len(existing_df) == 0: | |
| return normalize_prediction_df(new_df) | |
| if new_df is None or len(new_df) == 0: | |
| return normalize_prediction_df(existing_df) | |
| existing = normalize_prediction_df(existing_df).copy() | |
| incoming = normalize_prediction_df(new_df).copy() | |
| existing["_merge_priority"] = 0 | |
| incoming["_merge_priority"] = 1 | |
| combined = pd.concat([existing, incoming], ignore_index=True, sort=False) | |
| combined = normalize_prediction_df(combined) | |
| sort_keys = [ | |
| key | |
| for key in ["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"] | |
| if key in combined.columns | |
| ] | |
| if sort_keys: | |
| combined = combined.sort_values(sort_keys).reset_index(drop=True) | |
| if "target_timestamp" in combined.columns: | |
| combined = combined.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True) | |
| if "_merge_priority" in combined.columns: | |
| combined = combined.drop(columns=["_merge_priority"]) | |
| return combined | |
| # ============================================================================= | |
| # FORECAST FETCHING | |
| # ============================================================================= | |
| def fetch_forecasts_for_period(start_date, end_date): | |
| """ | |
| Fetch historical forecast runs for Aarhus from Open-Meteo. | |
| Returns DataFrame with all features. | |
| """ | |
| log_event("fetch_forecasts_for_period", start_date=str(start_date), end_date=str(end_date)) | |
| all_forecasts = [] | |
| run_hours = [0, 3, 6, 9, 12, 15, 18, 21] | |
| current_date = start_date | |
| cph_now = now_cph() | |
| while current_date <= end_date: | |
| for hour in run_hours: | |
| reference_time = datetime.combine(current_date, datetime.min.time()) + timedelta(hours=hour) | |
| reference_time = reference_time.replace(tzinfo=COPENHAGEN_TZ) | |
| if reference_time > cph_now: | |
| continue | |
| url = "https://api.open-meteo.com/v1/forecast" | |
| params = { | |
| "latitude": AARHUS_LAT, | |
| "longitude": AARHUS_LON, | |
| "start_date": current_date.strftime("%Y-%m-%d"), | |
| "end_date": (current_date + timedelta(days=2)).strftime("%Y-%m-%d"), | |
| "models": "dmi_harmonie", | |
| "hourly": FORECAST_FEATURES, | |
| "timezone": "Europe/Copenhagen" | |
| } | |
| try: | |
| resp = requests.get(url, params=params, timeout=30) | |
| if resp.status_code != 200: | |
| del params['models'] | |
| resp = requests.get(url, params=params, timeout=30) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| if 'hourly' in data: | |
| times = pd.to_datetime(data['hourly']['time']) | |
| times = times.tz_localize('Europe/Copenhagen', ambiguous='infer') | |
| for i, target_time in enumerate(times): | |
| lead_hours = (target_time - reference_time).total_seconds() / 3600 | |
| if 0 < lead_hours <= 48: | |
| row = { | |
| 'target_timestamp': target_time, | |
| 'reference_time': reference_time, | |
| 'lead_time_hours': int(lead_hours), | |
| 'lead_bucket': get_lead_bucket(int(lead_hours)), | |
| 'latitude': AARHUS_LAT, | |
| 'longitude': AARHUS_LON, | |
| } | |
| # Add all forecast features with dmi_ prefix | |
| for feat in FORECAST_FEATURES: | |
| col_name = f"dmi_{feat}_pred" | |
| row[col_name] = data['hourly'].get(feat, [None] * len(times))[i] | |
| # Compute wind components | |
| wind_speed = row.get('dmi_windspeed_10m_pred', 0) or 0 | |
| wind_dir = row.get('dmi_winddirection_10m_pred', 0) or 0 | |
| row['forecast_wind_u'] = -wind_speed * np.sin(np.radians(wind_dir)) | |
| row['forecast_wind_v'] = -wind_speed * np.cos(np.radians(wind_dir)) | |
| all_forecasts.append(row) | |
| except Exception as e: | |
| print(f"Error fetching forecast for {current_date} hour {hour}: {e}") | |
| continue | |
| current_date += timedelta(days=1) | |
| time.sleep(0.1) | |
| if not all_forecasts: | |
| log_event("fetch_forecasts_for_period no_data", start_date=str(start_date), end_date=str(end_date)) | |
| return None | |
| df = pd.DataFrame(all_forecasts) | |
| df = dedupe_rows(df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") | |
| log_event("fetch_forecasts_for_period done", rows=len(df)) | |
| return df | |
| def fetch_future_forecasts(): | |
| """Fetch future forecasts - 48 hours ahead.""" | |
| log_event("fetch_future_forecasts started") | |
| now = now_cph() | |
| today = now.date() | |
| current_hour = now.hour | |
| run_hours = [0, 3, 6, 9, 12, 15, 18, 21] | |
| latest_run = max([h for h in run_hours if h <= current_hour], default=0) | |
| reference_time = datetime.combine(today, datetime.min.time()) + timedelta(hours=latest_run) | |
| reference_time = reference_time.replace(tzinfo=COPENHAGEN_TZ) | |
| url = "https://api.open-meteo.com/v1/forecast" | |
| params = { | |
| "latitude": AARHUS_LAT, | |
| "longitude": AARHUS_LON, | |
| "start_date": today.strftime("%Y-%m-%d"), | |
| "end_date": (today + timedelta(days=3)).strftime("%Y-%m-%d"), | |
| "models": "dmi_harmonie", | |
| "hourly": FORECAST_FEATURES, | |
| "timezone": "Europe/Copenhagen" | |
| } | |
| try: | |
| resp = requests.get(url, params=params, timeout=30) | |
| if resp.status_code != 200: | |
| del params['models'] | |
| resp = requests.get(url, params=params, timeout=30) | |
| if resp.status_code != 200: | |
| return None | |
| data = resp.json() | |
| if 'hourly' not in data: | |
| return None | |
| times = pd.to_datetime(data['hourly']['time']) | |
| times = times.tz_localize('Europe/Copenhagen', ambiguous='infer') | |
| forecasts = [] | |
| for i, target_time in enumerate(times): | |
| if target_time > now: | |
| lead_hours = (target_time - reference_time).total_seconds() / 3600 | |
| if 0 < lead_hours <= 48: | |
| row = { | |
| 'target_timestamp': target_time, | |
| 'reference_time': reference_time, | |
| 'lead_time_hours': int(lead_hours), | |
| 'lead_bucket': get_lead_bucket(int(lead_hours)), | |
| } | |
| # Add all forecast features | |
| for feat in FORECAST_FEATURES: | |
| col_name = f"dmi_{feat}_pred" | |
| row[col_name] = data['hourly'].get(feat, [None] * len(times))[i] | |
| # Compute wind components | |
| wind_speed = row.get('dmi_windspeed_10m_pred', 0) or 0 | |
| wind_dir = row.get('dmi_winddirection_10m_pred', 0) or 0 | |
| row['forecast_wind_u'] = -wind_speed * np.sin(np.radians(wind_dir)) | |
| row['forecast_wind_v'] = -wind_speed * np.cos(np.radians(wind_dir)) | |
| forecasts.append(row) | |
| if not forecasts: | |
| log_event("fetch_future_forecasts no_data") | |
| return None | |
| df = pd.DataFrame(forecasts) | |
| df = df.drop_duplicates(subset=['target_timestamp'], keep='first') | |
| df = df.sort_values('target_timestamp').reset_index(drop=True) | |
| log_event("fetch_future_forecasts done", rows=len(df)) | |
| return df | |
| except Exception as e: | |
| log_exception("fetch_future_forecasts", e) | |
| return None | |
| # ============================================================================= | |
| # OBSERVATION FETCHING | |
| # ============================================================================= | |
| def fetch_observations_for_period(start_date, end_date): | |
| """ | |
| Fetch actual weather observations for Aarhus from Open-Meteo archive. | |
| """ | |
| log_event("fetch_observations_for_period", start_date=str(start_date), end_date=str(end_date)) | |
| url = "https://archive-api.open-meteo.com/v1/archive" | |
| cph_today = now_cph().date() | |
| if end_date > cph_today: | |
| end_date = cph_today | |
| params = { | |
| "latitude": AARHUS_LAT, | |
| "longitude": AARHUS_LON, | |
| "start_date": start_date.strftime("%Y-%m-%d"), | |
| "end_date": end_date.strftime("%Y-%m-%d"), | |
| "hourly": OBSERVATION_FEATURES, | |
| "timezone": "Europe/Copenhagen" | |
| } | |
| try: | |
| resp = requests.get(url, params=params, timeout=60) | |
| if resp.status_code != 200: | |
| return None | |
| data = resp.json() | |
| if 'hourly' not in data: | |
| return None | |
| times = pd.to_datetime(data['hourly']['time']) | |
| times = times.tz_localize('Europe/Copenhagen', ambiguous='infer') | |
| df = pd.DataFrame({ | |
| 'target_timestamp': times, | |
| 'actual_temp': data['hourly'].get('temperature_2m'), | |
| 'actual_humidity': data['hourly'].get('relative_humidity_2m'), | |
| 'actual_pressure': data['hourly'].get('pressure_msl'), | |
| 'actual_precipitation': data['hourly'].get('precipitation'), | |
| 'actual_rain': data['hourly'].get('rain'), | |
| 'actual_wind_speed': data['hourly'].get('windspeed_10m'), | |
| 'actual_wind_direction': data['hourly'].get('winddirection_10m'), | |
| 'actual_wind_gust': data['hourly'].get('windgusts_10m'), | |
| }) | |
| # Derived targets | |
| df['actual_wind_u'] = -df['actual_wind_speed'] * np.sin(np.radians(df['actual_wind_direction'])) | |
| df['actual_wind_v'] = -df['actual_wind_speed'] * np.cos(np.radians(df['actual_wind_direction'])) | |
| df['rain_event'] = (df['actual_precipitation'] > 0.1).astype(int) | |
| df['rain_amount'] = df['actual_precipitation'] | |
| # Filter future times | |
| current_hour = now_cph().replace(minute=0, second=0, microsecond=0) | |
| df = df[df['target_timestamp'] <= current_hour] | |
| log_event("fetch_observations_for_period done", rows=len(df)) | |
| return df | |
| except Exception as e: | |
| log_exception("fetch_observations_for_period", e) | |
| return None | |
| # ============================================================================= | |
| # FEATURE ENGINEERING | |
| # ============================================================================= | |
| def add_cyclical_features(df, col, period): | |
| """Add sin/cos cyclical encoding for a time feature.""" | |
| df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period) | |
| df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period) | |
| return df | |
| def add_temporal_features(df): | |
| """Add temporal features to dataframe.""" | |
| df['hour'] = df['reference_time'].dt.hour | |
| df['month'] = df['reference_time'].dt.month | |
| df['day_of_year'] = df['reference_time'].dt.dayofyear | |
| df = add_cyclical_features(df, 'hour', 24) | |
| df = add_cyclical_features(df, 'month', 12) | |
| return df | |
| def add_run_delta_features(df): | |
| """Add features showing change between consecutive forecast runs.""" | |
| df = df.sort_values(['reference_time', 'target_timestamp']) | |
| # For each target timestamp, compute delta from previous run | |
| for feat in ['temperature_2m', 'windspeed_10m', 'windgusts_10m', 'precipitation', 'pressure_msl', 'relative_humidity_2m']: | |
| col = f'dmi_{feat}_pred' | |
| delta_col = f'dmi_{feat}_pred_run_delta' | |
| if col in df.columns: | |
| df[delta_col] = df.groupby('target_timestamp')[col].diff().fillna(0) | |
| return df | |
| def build_observation_context_frame(observations_df): | |
| """Build causal lag features from unique observation timestamps.""" | |
| columns = [OBSERVATION_CONTEXT_TIMESTAMP_COL, *OBSERVATION_CONTEXT_COLUMNS] | |
| if observations_df is None or len(observations_df) == 0: | |
| return pd.DataFrame(columns=columns) | |
| observations = observations_df.copy() | |
| observations = ensure_copenhagen_time(observations, "target_timestamp") | |
| observations = dedupe_rows(observations, ["target_timestamp"], sort_keys=["target_timestamp"], keep="last") | |
| observations = observations.sort_values("target_timestamp").reset_index(drop=True) | |
| if "actual_wind_u" not in observations.columns and {"actual_wind_speed", "actual_wind_direction"}.issubset(observations.columns): | |
| observations["actual_wind_u"] = -observations["actual_wind_speed"] * np.sin(np.radians(observations["actual_wind_direction"])) | |
| if "actual_wind_v" not in observations.columns and {"actual_wind_speed", "actual_wind_direction"}.issubset(observations.columns): | |
| observations["actual_wind_v"] = -observations["actual_wind_speed"] * np.cos(np.radians(observations["actual_wind_direction"])) | |
| if "rain_event" not in observations.columns and "actual_precipitation" in observations.columns: | |
| observations["rain_event"] = (observations["actual_precipitation"].fillna(0.0) > 0.1).astype(int) | |
| if "rain_amount" not in observations.columns and "actual_precipitation" in observations.columns: | |
| observations["rain_amount"] = observations["actual_precipitation"] | |
| context = pd.DataFrame({OBSERVATION_CONTEXT_TIMESTAMP_COL: observations["target_timestamp"]}) | |
| temp_series = get_optional_series(observations, "actual_temp").shift(1) | |
| wind_speed_series = get_optional_series(observations, "actual_wind_speed").shift(1) | |
| wind_u_series = get_optional_series(observations, "actual_wind_u").shift(1) | |
| wind_v_series = get_optional_series(observations, "actual_wind_v").shift(1) | |
| pressure_series = get_optional_series(observations, "actual_pressure").shift(1) | |
| humidity_series = get_optional_series(observations, "actual_humidity").shift(1) | |
| precipitation_series = get_optional_series(observations, "actual_precipitation").shift(1) | |
| context["obs_temp_lag_1h"] = temp_series | |
| context["obs_temp_mean_3h"] = temp_series.rolling(3, min_periods=1).mean() | |
| context["obs_temp_mean_6h"] = temp_series.rolling(6, min_periods=1).mean() | |
| context["obs_wind_lag_1h"] = wind_speed_series | |
| context["obs_wind_mean_3h"] = wind_speed_series.rolling(3, min_periods=1).mean() | |
| context["obs_wind_mean_6h"] = wind_speed_series.rolling(6, min_periods=1).mean() | |
| context["obs_wind_u_lag_1h"] = wind_u_series | |
| context["obs_wind_u_mean_3h"] = wind_u_series.rolling(3, min_periods=1).mean() | |
| context["obs_wind_v_lag_1h"] = wind_v_series | |
| context["obs_wind_v_mean_3h"] = wind_v_series.rolling(3, min_periods=1).mean() | |
| context["obs_pressure_lag_1h"] = pressure_series | |
| context["obs_pressure_mean_3h"] = pressure_series.rolling(3, min_periods=1).mean() | |
| context["obs_humidity_lag_1h"] = humidity_series | |
| context["obs_humidity_mean_3h"] = humidity_series.rolling(3, min_periods=1).mean() | |
| context["obs_precip_lag_1h"] = precipitation_series | |
| context["obs_precip_sum_3h"] = precipitation_series.rolling(3, min_periods=1).sum() | |
| context["obs_precip_sum_6h"] = precipitation_series.rolling(6, min_periods=1).sum() | |
| context["obs_precip_sum_12h"] = precipitation_series.rolling(12, min_periods=1).sum() | |
| context = ensure_observation_context_columns(context) | |
| return context.sort_values(OBSERVATION_CONTEXT_TIMESTAMP_COL).reset_index(drop=True) | |
| def attach_observation_context(df, observation_context_df): | |
| """Attach only observations that existed at the forecast reference time.""" | |
| if df is None or len(df) == 0: | |
| return ensure_observation_context_columns(df) | |
| enriched = df.copy() | |
| drop_cols = [OBSERVATION_CONTEXT_TIMESTAMP_COL, *OBSERVATION_CONTEXT_COLUMNS] | |
| existing_context_cols = [column_name for column_name in drop_cols if column_name in enriched.columns] | |
| if existing_context_cols: | |
| enriched = enriched.drop(columns=existing_context_cols) | |
| if "reference_time" not in enriched.columns: | |
| return ensure_observation_context_columns(enriched) | |
| enriched = ensure_copenhagen_time(enriched, "reference_time") | |
| enriched["_row_order"] = np.arange(len(enriched)) | |
| left = enriched.copy() | |
| left["_reference_time_key"] = datetime_series_to_epoch_ns(left["reference_time"]) | |
| left = left.sort_values("_reference_time_key").reset_index(drop=True) | |
| if observation_context_df is None or len(observation_context_df) == 0: | |
| left = ensure_observation_context_columns(left) | |
| left = left.sort_values("_row_order").drop(columns=["_row_order"]).reset_index(drop=True) | |
| return left | |
| if OBSERVATION_CONTEXT_TIMESTAMP_COL not in observation_context_df.columns: | |
| observation_context_df = build_observation_context_frame(observation_context_df) | |
| else: | |
| observation_context_df = ensure_observation_context_columns(observation_context_df.copy()) | |
| observation_context_df = ensure_copenhagen_time(observation_context_df, OBSERVATION_CONTEXT_TIMESTAMP_COL) | |
| observation_context_df = observation_context_df.dropna(subset=[OBSERVATION_CONTEXT_TIMESTAMP_COL]).copy() | |
| observation_context_df["_observation_context_key"] = datetime_series_to_epoch_ns( | |
| observation_context_df[OBSERVATION_CONTEXT_TIMESTAMP_COL] | |
| ) | |
| observation_context_df = observation_context_df.sort_values("_observation_context_key").reset_index(drop=True) | |
| merged = pd.merge_asof( | |
| left, | |
| observation_context_df[ | |
| [OBSERVATION_CONTEXT_TIMESTAMP_COL, "_observation_context_key", *OBSERVATION_CONTEXT_COLUMNS] | |
| ], | |
| left_on="_reference_time_key", | |
| right_on="_observation_context_key", | |
| direction="backward", | |
| ) | |
| merged = ensure_observation_context_columns(merged) | |
| merged = merged.sort_values("_row_order").drop(columns=["_row_order", "_reference_time_key", "_observation_context_key"]).reset_index(drop=True) | |
| return merged | |
| def build_live_feature_frame(forecasts_df): | |
| """Build live inference features including causal observation context when available.""" | |
| feature_frame = build_model_features(forecasts_df) | |
| if feature_frame is None or len(feature_frame) == 0: | |
| return ensure_observation_context_columns(feature_frame) | |
| if "reference_time" not in feature_frame.columns: | |
| return ensure_observation_context_columns(feature_frame) | |
| reference_series = pd.to_datetime(feature_frame["reference_time"], errors="coerce") | |
| min_reference_time = reference_series.min() | |
| max_reference_time = reference_series.max() | |
| if pd.isna(min_reference_time) or pd.isna(max_reference_time): | |
| return ensure_observation_context_columns(feature_frame) | |
| observation_start = (min_reference_time - timedelta(days=2)).date() | |
| observation_end = max_reference_time.date() | |
| observations = fetch_observations_for_period(observation_start, observation_end) | |
| if observations is None or len(observations) == 0: | |
| log_event("build_live_feature_frame missing_observations", start_date=str(observation_start), end_date=str(observation_end)) | |
| return ensure_observation_context_columns(feature_frame) | |
| observation_context = build_observation_context_frame(observations) | |
| return attach_observation_context(feature_frame, observation_context) | |
| def extract_observations_from_training_matrix(df): | |
| """Recover a unique observation frame from the stored training matrix.""" | |
| if df is None or len(df) == 0 or "target_timestamp" not in df.columns: | |
| return None | |
| source_columns = ["target_timestamp", *[column_name for column_name in OBSERVATION_SOURCE_COLUMNS if column_name in df.columns]] | |
| observations = df[source_columns].copy() | |
| observations = ensure_copenhagen_time(observations, "target_timestamp") | |
| observations = dedupe_rows(observations, ["target_timestamp"], sort_keys=["target_timestamp"], keep="last") | |
| return observations.sort_values("target_timestamp").reset_index(drop=True) | |
| def upgrade_training_matrix_with_observation_context(existing_df): | |
| """Rebuild observation-context features for the full stored training matrix.""" | |
| if existing_df is None or len(existing_df) == 0: | |
| return ensure_observation_context_columns(existing_df) | |
| upgraded = existing_df.copy() | |
| upgraded = ensure_copenhagen_time(upgraded, "target_timestamp") | |
| upgraded = ensure_copenhagen_time(upgraded, "reference_time") | |
| observations = extract_observations_from_training_matrix(upgraded) | |
| observation_context = build_observation_context_frame(observations) | |
| upgraded = attach_observation_context(upgraded, observation_context) | |
| return dedupe_rows(upgraded, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") | |
| def build_training_matrix(forecasts_df, observations_df): | |
| """ | |
| Build training matrix by merging forecasts with observations. | |
| Adds all derived features. | |
| """ | |
| if forecasts_df is None or observations_df is None: | |
| log_event("build_training_matrix missing_inputs") | |
| return None | |
| # Merge on target_timestamp | |
| merged = pd.merge(forecasts_df, observations_df, on='target_timestamp', how='inner') | |
| if len(merged) == 0: | |
| log_event("build_training_matrix no_matches") | |
| return None | |
| merged = build_model_features(merged) | |
| observation_context = build_observation_context_frame(observations_df) | |
| merged = attach_observation_context(merged, observation_context) | |
| # Add correction targets for temperature and wind | |
| merged['temp_correction_target'] = merged['actual_temp'] - merged['dmi_temperature_2m_pred'] | |
| merged['wind_speed_correction_target'] = merged['actual_wind_speed'] - merged['dmi_windspeed_10m_pred'] | |
| merged['wind_gust_correction_target'] = merged['actual_wind_gust'] - merged['dmi_windgusts_10m_pred'] | |
| # Filter out future times | |
| current_hour = now_cph().replace(minute=0, second=0, microsecond=0) | |
| merged = merged[merged['target_timestamp'] <= current_hour] | |
| merged = dedupe_rows(merged, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") | |
| log_event("build_training_matrix done", rows=len(merged), columns=len(merged.columns)) | |
| return merged | |
| # ============================================================================= | |
| # DATASET OPERATIONS | |
| # ============================================================================= | |
| def upload_to_dataset(local_path, filename, dataset_name, repo_type="dataset"): | |
| """Upload a file to Hugging Face dataset.""" | |
| log_event("upload_to_dataset", local_path=local_path, filename=filename, dataset_name=dataset_name) | |
| try: | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=filename, | |
| repo_id=dataset_name, | |
| repo_type=repo_type, | |
| token=HF_TOKEN | |
| ) | |
| log_event("upload_to_dataset done", filename=filename, dataset_name=dataset_name) | |
| return True | |
| except Exception as e: | |
| log_exception("upload_to_dataset", e) | |
| return False | |
| def load_from_dataset(filename, dataset_name, repo_type="dataset"): | |
| """Load a file from Hugging Face dataset.""" | |
| log_event("load_from_dataset", filename=filename, dataset_name=dataset_name) | |
| try: | |
| path = hf_hub_download( | |
| repo_id=dataset_name, | |
| filename=filename, | |
| repo_type=repo_type, | |
| token=HF_TOKEN | |
| ) | |
| log_event("load_from_dataset done", filename=filename, dataset_name=dataset_name, path=path) | |
| return path | |
| except Exception as e: | |
| log_exception("load_from_dataset", e) | |
| return None | |
| def load_first_available_dataset_file(filenames, dataset_name, repo_type="dataset"): | |
| """Return the first dataset file that exists.""" | |
| for filename in filenames: | |
| path = load_from_dataset(filename, dataset_name, repo_type=repo_type) | |
| if path: | |
| return path, filename | |
| return None, None | |
| def init_dataset_if_needed(): | |
| """Ensure training_matrix.parquet exists so first runs can start cleanly.""" | |
| existing_path, existing_name = load_first_available_dataset_file( | |
| ["training_matrix.parquet", "data.parquet"], | |
| DATASET_NAME, | |
| ) | |
| if existing_path: | |
| print(f"โ Training dataset already available as {existing_name}") | |
| return existing_path, existing_name | |
| empty_df = pd.DataFrame(columns=TRAINING_DEDUP_KEYS) | |
| empty_df.to_parquet("training_matrix.parquet") | |
| if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME): | |
| print("โ Initialized empty training_matrix.parquet") | |
| return "training_matrix.parquet", "training_matrix.parquet" | |
| print("โ ๏ธ Could not initialize training_matrix.parquet") | |
| return None, None | |
| def load_existing_training_matrix(): | |
| """Load training matrix using the new filename first and legacy fallback second.""" | |
| existing_path, existing_name = load_first_available_dataset_file( | |
| ["training_matrix.parquet", "data.parquet"], | |
| DATASET_NAME, | |
| ) | |
| if not existing_path: | |
| return None, None | |
| existing = pd.read_parquet(existing_path) | |
| if existing_name == "data.parquet" and "timestamp" in existing.columns and "target_timestamp" not in existing.columns: | |
| existing = existing.rename(columns={"timestamp": "target_timestamp"}) | |
| if existing.empty or "target_timestamp" not in existing.columns: | |
| return existing, existing_name | |
| existing = ensure_copenhagen_time(existing, "target_timestamp") | |
| existing = ensure_copenhagen_time(existing, "reference_time") | |
| existing = ensure_copenhagen_time(existing, OBSERVATION_CONTEXT_TIMESTAMP_COL) | |
| existing = dedupe_rows(existing, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") | |
| return existing, existing_name | |
| def safe_number(value, digits=3, default=None): | |
| """Convert pandas/numpy scalars to JSON-safe floats.""" | |
| if value is None: | |
| return default | |
| try: | |
| if pd.isna(value): | |
| return default | |
| except Exception: | |
| pass | |
| try: | |
| return round(float(value), digits) | |
| except Exception: | |
| return default | |
| def to_iso(value): | |
| """Serialize timestamps for the frontend snapshot.""" | |
| if value is None: | |
| return None | |
| try: | |
| if pd.isna(value): | |
| return None | |
| except Exception: | |
| pass | |
| if hasattr(value, "isoformat"): | |
| return value.isoformat() | |
| return str(value) | |
| def load_json_from_dataset(filename, dataset_name=DATASET_NAME): | |
| path = load_from_dataset(filename, dataset_name) | |
| if not path: | |
| return None | |
| try: | |
| with open(path, "r", encoding="utf-8") as handle: | |
| return json.load(handle) | |
| except Exception as exc: | |
| log_exception(f"load_json_from_dataset[{filename}]", exc) | |
| return None | |
| def extract_feature_importance_from_bundle(bundle, target_name, top_n=8): | |
| """Aggregate feature importance across active bucket models.""" | |
| if bundle is None or "models" not in bundle: | |
| return [] | |
| totals = {} | |
| counts = {} | |
| for bucket, model_info in bundle.get("models", {}).items(): | |
| model = model_info.get("model") | |
| feature_cols = model_info.get("feature_columns") or bundle.get("feature_columns", []) | |
| importances = getattr(model, "feature_importances_", None) | |
| if importances is None or not feature_cols: | |
| continue | |
| for feature_name, importance in zip(feature_cols, importances): | |
| totals[feature_name] = totals.get(feature_name, 0.0) + float(importance) | |
| counts[feature_name] = counts.get(feature_name, 0) + 1 | |
| rows = [] | |
| for feature_name, total in totals.items(): | |
| avg_importance = total / max(counts.get(feature_name, 1), 1) | |
| rows.append( | |
| { | |
| "target": target_name, | |
| "feature": feature_name, | |
| "importance": round(avg_importance, 6), | |
| } | |
| ) | |
| rows.sort(key=lambda item: item["importance"], reverse=True) | |
| return rows[:top_n] | |
| def build_recent_backtest(training_df): | |
| """Recreate recent ML-vs-DMI backtest from the stored training matrix.""" | |
| if training_df is None or len(training_df) == 0 or "target_timestamp" not in training_df.columns: | |
| return None | |
| current_time = now_cph() | |
| history = training_df[ | |
| (training_df["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS)) | |
| & (training_df["target_timestamp"] <= current_time) | |
| ].copy() | |
| if len(history) == 0: | |
| return None | |
| if "lead_time_hours" in history.columns: | |
| history = history[ | |
| history["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both") | |
| ].copy() | |
| if len(history) == 0: | |
| return None | |
| history["ml_temp"] = history.get("dmi_temperature_2m_pred", pd.Series(np.nan, index=history.index)) | |
| history["ml_wind_speed"] = history.get("dmi_windspeed_10m_pred", pd.Series(np.nan, index=history.index)) | |
| history["ml_wind_gust"] = history.get("dmi_windgusts_10m_pred", pd.Series(np.nan, index=history.index)) | |
| history["ml_rain_prob"] = ( | |
| history.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=history.index)) | |
| .fillna(0.0) | |
| .clip(0.0, 100.0) | |
| / 100.0 | |
| ) | |
| history["ml_rain_amount"] = ( | |
| history.get("dmi_precipitation_pred", pd.Series(0.0, index=history.index)) | |
| .fillna(0.0) | |
| .clip(0.0, None) | |
| ) | |
| target_specs = [ | |
| ("temperature", "ml_temp", "dmi_temperature_2m_pred", True), | |
| ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True), | |
| ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True), | |
| ("rain_event", "ml_rain_prob", None, False), | |
| ("rain_amount", "ml_rain_amount", None, False), | |
| ] | |
| for target_name, output_col, baseline_col, is_correction in target_specs: | |
| bundle = load_model_bundle(target_name) | |
| if not bundle: | |
| continue | |
| target_pred = predict_with_bundle(bundle, history) | |
| if target_pred is None: | |
| continue | |
| prediction_series = pd.Series(target_pred, index=history.index, dtype="float64") | |
| prediction_mask = prediction_series.notna() | |
| if not prediction_mask.any(): | |
| continue | |
| if is_correction: | |
| history.loc[prediction_mask, output_col] = ( | |
| history.loc[prediction_mask, baseline_col] + prediction_series[prediction_mask] | |
| ) | |
| elif target_name == "rain_event": | |
| history.loc[prediction_mask, output_col] = prediction_series[prediction_mask].clip(0.0, 1.0) | |
| else: | |
| history.loc[prediction_mask, output_col] = prediction_series[prediction_mask].clip(0.0, None) | |
| sort_columns = ["target_timestamp"] | |
| ascending = [True] | |
| if "lead_time_hours" in history.columns: | |
| sort_columns.append("lead_time_hours") | |
| ascending.append(False) | |
| if "reference_time" in history.columns: | |
| sort_columns.append("reference_time") | |
| ascending.append(False) | |
| history = history.sort_values(sort_columns, ascending=ascending) | |
| history = history.drop_duplicates(subset=["target_timestamp"], keep="first").reset_index(drop=True) | |
| return history | |
| def rebuild_future_ml_columns(predictions_df, registry_revision=None): | |
| """Recompute live ML columns from stored forecast features before publishing the frontend snapshot.""" | |
| if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns: | |
| return predictions_df | |
| current_time = now_cph() | |
| repaired = predictions_df.copy() | |
| future_mask = repaired["target_timestamp"] > current_time | |
| if not future_mask.any(): | |
| return repaired | |
| future_df = repaired.loc[future_mask].copy() | |
| future_df["ml_temp"] = future_df.get("ml_temp", future_df.get("dmi_temperature_2m_pred")) | |
| future_df["ml_wind_speed"] = future_df.get("ml_wind_speed", future_df.get("dmi_windspeed_10m_pred")) | |
| future_df["ml_wind_gust"] = future_df.get("ml_wind_gust", future_df.get("dmi_windgusts_10m_pred")) | |
| future_df["ml_rain_prob"] = future_df.get( | |
| "ml_rain_prob", | |
| future_df.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=future_df.index)) | |
| .fillna(0.0) | |
| .clip(0.0, 100.0) | |
| / 100.0, | |
| ) | |
| future_df["ml_rain_amount"] = future_df.get( | |
| "ml_rain_amount", | |
| future_df.get("dmi_precipitation_pred", pd.Series(0.0, index=future_df.index)).fillna(0.0).clip(0.0, None), | |
| ) | |
| target_specs = [ | |
| ("temperature", "ml_temp", "dmi_temperature_2m_pred", True), | |
| ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True), | |
| ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True), | |
| ("rain_event", "ml_rain_prob", None, False), | |
| ("rain_amount", "ml_rain_amount", None, False), | |
| ] | |
| for target_name, output_col, baseline_col, is_correction in target_specs: | |
| bundle = load_model_bundle(target_name, cache_revision=registry_revision) | |
| target_pred = predict_with_bundle(bundle, future_df) | |
| if target_pred is None: | |
| continue | |
| target_series = pd.Series(target_pred, index=future_df.index, dtype="float64") | |
| target_mask = target_series.notna() | |
| if not target_mask.any(): | |
| continue | |
| if is_correction: | |
| future_df.loc[target_mask, output_col] = future_df.loc[target_mask, baseline_col] + target_series[target_mask] | |
| elif target_name == "rain_event": | |
| future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, 1.0) | |
| else: | |
| future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, None) | |
| future_df["ml_rain_prob"] = future_df["ml_rain_prob"].fillna(0.0).clip(0.0, 1.0) | |
| future_df["ml_rain_amount"] = future_df["ml_rain_amount"].fillna(0.0).clip(0.0, None) | |
| for column_name in future_df.columns: | |
| if column_name not in repaired.columns: | |
| repaired[column_name] = np.nan | |
| repaired.loc[future_mask, future_df.columns] = future_df | |
| return repaired | |
| def build_recent_verified_history(predictions_df): | |
| """Return verified prediction rows from the last 7 days.""" | |
| if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns: | |
| return None | |
| current_time = now_cph() | |
| verified = predictions_df[predictions_df["verified"].fillna(False).astype(bool)].copy() | |
| if len(verified) == 0: | |
| return None | |
| verified = verified[ | |
| (verified["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS)) | |
| & (verified["target_timestamp"] <= current_time) | |
| ].copy() | |
| if len(verified) == 0: | |
| return None | |
| if "lead_time_hours" in verified.columns: | |
| verified = verified[ | |
| verified["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both") | |
| ].copy() | |
| if len(verified) == 0: | |
| return None | |
| return verified.sort_values("target_timestamp").reset_index(drop=True) | |
| def merge_recent_history_sources(backtest_df=None, predictions_df=None): | |
| """Combine 7-day backtest with newer verified predictions, preferring verified rows.""" | |
| backtest = backtest_df.copy() if backtest_df is not None and len(backtest_df) > 0 else None | |
| verified = build_recent_verified_history(predictions_df) | |
| if backtest is None and verified is None: | |
| return None | |
| if backtest is None: | |
| return verified | |
| if verified is None: | |
| return backtest.sort_values("target_timestamp").reset_index(drop=True) | |
| backtest = backtest.copy() | |
| verified = verified.copy() | |
| backtest["_history_priority"] = 0 | |
| verified["_history_priority"] = 1 | |
| merged = pd.concat([backtest, verified], ignore_index=True, sort=False) | |
| sort_columns = ["target_timestamp", "_history_priority"] | |
| ascending = [True, True] | |
| if "lead_time_hours" in merged.columns: | |
| sort_columns.append("lead_time_hours") | |
| ascending.append(False) | |
| if "reference_time" in merged.columns: | |
| sort_columns.append("reference_time") | |
| ascending.append(False) | |
| merged = merged.sort_values(sort_columns, ascending=ascending) | |
| merged = merged.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True) | |
| if "_history_priority" in merged.columns: | |
| merged = merged.drop(columns=["_history_priority"]) | |
| return merged | |
| def calculate_verification_metrics(predictions_df=None, backtest_df=None): | |
| """Compute frontend-facing verification summary.""" | |
| source_df = merge_recent_history_sources(backtest_df=backtest_df, predictions_df=predictions_df) | |
| period_label = "Ingen verificeret historik endnu" | |
| if source_df is not None and len(source_df) > 0: | |
| verified = build_recent_verified_history(predictions_df) | |
| if verified is not None and len(verified) > 0: | |
| period_label = "Seneste 7 dages backtest og verificerede predictioner" | |
| else: | |
| period_label = "Seneste 7 dages backtest" | |
| result = { | |
| "target": "temperature", | |
| "periodLabel": period_label, | |
| "rmseDmi": None, | |
| "rmseMl": None, | |
| "maeDmi": None, | |
| "maeMl": None, | |
| "winRate": None, | |
| "totalPredictions": 0 if source_df is None else int(len(source_df)), | |
| } | |
| if source_df is None or len(source_df) == 0: | |
| return result | |
| if {"actual_temp", "dmi_temperature_2m_pred", "ml_temp"}.issubset(source_df.columns): | |
| valid = source_df.dropna(subset=["actual_temp", "dmi_temperature_2m_pred", "ml_temp"]).copy() | |
| if len(valid) > 0: | |
| dmi_error = valid["actual_temp"] - valid["dmi_temperature_2m_pred"] | |
| ml_error = valid["actual_temp"] - valid["ml_temp"] | |
| result["rmseDmi"] = safe_number(np.sqrt(np.mean(dmi_error**2)), digits=3) | |
| result["rmseMl"] = safe_number(np.sqrt(np.mean(ml_error**2)), digits=3) | |
| result["maeDmi"] = safe_number(np.mean(np.abs(dmi_error)), digits=3) | |
| result["maeMl"] = safe_number(np.mean(np.abs(ml_error)), digits=3) | |
| win_rate = float((np.abs(ml_error) < np.abs(dmi_error)).mean() * 100) | |
| result["winRate"] = round(win_rate, 1) | |
| result["totalPredictions"] = int(len(valid)) | |
| return result | |
| def build_lead_bucket_rows(registry): | |
| """Normalize registry summary rows for the frontend.""" | |
| if not registry: | |
| return [] | |
| label_map = { | |
| "1-6": "1-6 timer", | |
| "7-12": "7-12 timer", | |
| "13-24": "13-24 timer", | |
| "25-48": "25-48 timer", | |
| } | |
| rows = [] | |
| for row in registry.get("summary_rows", []): | |
| baseline = safe_number(row.get("baseline_metric"), digits=6) | |
| ml_metric = safe_number(row.get("ml_metric"), digits=6) | |
| improvement = None | |
| if baseline not in (None, 0) and ml_metric is not None: | |
| improvement = round(((baseline - ml_metric) / baseline) * 100, 2) | |
| rows.append( | |
| { | |
| "bucket": row.get("lead_bucket"), | |
| "label": label_map.get(row.get("lead_bucket"), row.get("lead_bucket")), | |
| "baselineMetric": baseline, | |
| "mlMetric": ml_metric, | |
| "improvementPct": improvement, | |
| "target": row.get("target"), | |
| } | |
| ) | |
| return rows | |
| TARGET_LABELS = { | |
| "temperature": "Temperatur", | |
| "wind_speed": "Vindhastighed", | |
| "wind_gust": "Vindstรธd", | |
| "rain_event": "Regnrisiko", | |
| "rain_amount": "Regnmรฆngde", | |
| } | |
| LEAD_BUCKET_ORDER = ["1-6", "7-12", "13-24", "25-48"] | |
| def build_target_labels(): | |
| return dict(TARGET_LABELS) | |
| def build_explanations(): | |
| return { | |
| "forecast": "Du ser DMI's prognose side om side med vores ML-justering, nรฅr der er en aktiv model.", | |
| "performance": "Her kan du sammenligne, hvad DMI sagde, hvad ML sagde, og hvad vejret faktisk endte med at blive.", | |
| "sources": "DMI er grundprognosen. ML er vores lokale justering. Hvis en ML-model ikke er aktiv, viser vi DMI direkte.", | |
| } | |
| def build_target_status(registry): | |
| registry_targets = registry.get("targets", {}) if registry else {} | |
| target_status = {} | |
| for target_name in MODEL_FILES: | |
| active_buckets = registry_targets.get(target_name, {}).get("active_buckets") or [] | |
| ordered_buckets = [bucket for bucket in LEAD_BUCKET_ORDER if bucket in active_buckets] | |
| has_active_model = len(ordered_buckets) > 0 | |
| target_label = TARGET_LABELS.get(target_name, target_name) | |
| if has_active_model: | |
| status_label = "ML aktiv" | |
| status_description = ( | |
| f"Vi viser baade DMI og ML for {target_label.lower()}, og ML bruges som den aktive prognose, nรฅr den findes." | |
| ) | |
| else: | |
| status_label = "Vises som DMI-prognose" | |
| status_description = ( | |
| f"Vi viser DMI direkte for {target_label.lower()}, fordi der ikke er en aktiv ML-model for dette signal endnu." | |
| ) | |
| target_status[target_name] = { | |
| "hasActiveModel": has_active_model, | |
| "activeBuckets": ordered_buckets, | |
| "statusLabel": status_label, | |
| "statusDescription": status_description, | |
| } | |
| return target_status | |
| def choose_effective_value(ml_value, dmi_value, has_active_model): | |
| if has_active_model and ml_value is not None: | |
| return ml_value, "ml" | |
| if dmi_value is not None: | |
| return dmi_value, "dmi" | |
| if ml_value is not None: | |
| return ml_value, "ml" | |
| return None, "dmi" | |
| def build_history_payload(predictions_df=None, backtest_df=None): | |
| source_df = merge_recent_history_sources(backtest_df=backtest_df, predictions_df=predictions_df) | |
| history = { | |
| "temperature": [], | |
| "wind": [], | |
| "rain": [], | |
| } | |
| if source_df is None or len(source_df) == 0: | |
| return history | |
| for _, row in source_df.iterrows(): | |
| history["temperature"].append( | |
| { | |
| "timestamp": to_iso(row.get("target_timestamp")), | |
| "dmiTemp": safe_number(row.get("dmi_temperature_2m_pred")), | |
| "mlTemp": safe_number(row.get("ml_temp")), | |
| "actual": safe_number(row.get("actual_temp")), | |
| "actualTemp": safe_number(row.get("actual_temp")), | |
| "verified": True, | |
| } | |
| ) | |
| history["wind"].append( | |
| { | |
| "timestamp": to_iso(row.get("target_timestamp")), | |
| "dmiWindSpeed": safe_number(row.get("dmi_windspeed_10m_pred")), | |
| "mlWindSpeed": safe_number(row.get("ml_wind_speed")), | |
| "actualWindSpeed": safe_number(row.get("actual_wind_speed")), | |
| "dmiWindGust": safe_number(row.get("dmi_windgusts_10m_pred")), | |
| "mlWindGust": safe_number(row.get("ml_wind_gust")), | |
| "actualWindGust": safe_number(row.get("actual_wind_gust")), | |
| "verified": True, | |
| } | |
| ) | |
| dmi_rain_prob = safe_number(row.get("dmi_precipitation_probability_pred"), digits=2, default=0.0) or 0.0 | |
| ml_rain_prob = round((safe_number(row.get("ml_rain_prob"), digits=4, default=0.0) or 0.0) * 100, 2) | |
| actual_rain_amount = safe_number( | |
| row.get("actual_rain_amount", row.get("actual_precipitation")), | |
| digits=3, | |
| default=None, | |
| ) | |
| if actual_rain_amount is None: | |
| actual_rain_amount = safe_number(row.get("actual_precipitation"), digits=3, default=None) | |
| actual_rain_event = row.get("actual_rain_event") | |
| if actual_rain_event is None or pd.isna(actual_rain_event): | |
| if actual_rain_amount is None: | |
| actual_rain_event = None | |
| else: | |
| actual_rain_event = 1 if actual_rain_amount > 0.1 else 0 | |
| else: | |
| actual_rain_event = int(actual_rain_event) | |
| history["rain"].append( | |
| { | |
| "timestamp": to_iso(row.get("target_timestamp")), | |
| "dmiRainProb": dmi_rain_prob, | |
| "mlRainProb": ml_rain_prob, | |
| "actualRainEvent": actual_rain_event, | |
| "dmiRainAmount": safe_number(row.get("dmi_precipitation_pred"), digits=3, default=0.0) or 0.0, | |
| "mlRainAmount": safe_number(row.get("ml_rain_amount"), digits=3, default=0.0) or 0.0, | |
| "actualRainAmount": actual_rain_amount, | |
| "verified": True, | |
| } | |
| ) | |
| return history | |
| def build_forecast_row(row, target_status): | |
| dmi_temp = safe_number(row.get("dmi_temperature_2m_pred")) | |
| ml_temp = safe_number(row.get("ml_temp")) | |
| dmi_wind_speed = safe_number(row.get("dmi_windspeed_10m_pred")) | |
| ml_wind_speed = safe_number(row.get("ml_wind_speed")) | |
| dmi_wind_gust = safe_number(row.get("dmi_windgusts_10m_pred")) | |
| ml_wind_gust = safe_number(row.get("ml_wind_gust")) | |
| dmi_rain_prob = safe_number(row.get("dmi_precipitation_probability_pred"), digits=2, default=0.0) or 0.0 | |
| ml_rain_prob = round((safe_number(row.get("ml_rain_prob"), digits=4, default=0.0) or 0.0) * 100, 2) | |
| dmi_rain_amount = safe_number(row.get("dmi_precipitation_pred"), digits=3, default=0.0) or 0.0 | |
| ml_rain_amount = safe_number(row.get("ml_rain_amount"), digits=3, default=0.0) or 0.0 | |
| effective_temp, effective_temp_source = choose_effective_value( | |
| ml_temp, | |
| dmi_temp, | |
| target_status["temperature"]["hasActiveModel"], | |
| ) | |
| effective_wind_speed, effective_wind_speed_source = choose_effective_value( | |
| ml_wind_speed, | |
| dmi_wind_speed, | |
| target_status["wind_speed"]["hasActiveModel"], | |
| ) | |
| effective_wind_gust, effective_wind_gust_source = choose_effective_value( | |
| ml_wind_gust, | |
| dmi_wind_gust, | |
| target_status["wind_gust"]["hasActiveModel"], | |
| ) | |
| effective_rain_prob, effective_rain_prob_source = choose_effective_value( | |
| ml_rain_prob, | |
| dmi_rain_prob, | |
| target_status["rain_event"]["hasActiveModel"], | |
| ) | |
| effective_rain_amount, effective_rain_amount_source = choose_effective_value( | |
| ml_rain_amount, | |
| dmi_rain_amount, | |
| target_status["rain_amount"]["hasActiveModel"], | |
| ) | |
| timestamp = to_iso(row.get("target_timestamp")) | |
| return { | |
| "timestamp": timestamp, | |
| "hour": timestamp, | |
| "leadTimeHours": int(row.get("lead_time_hours") or 0), | |
| "dmiTemp": dmi_temp, | |
| "mlTemp": ml_temp, | |
| "effectiveTemp": effective_temp, | |
| "effectiveTempSource": effective_temp_source, | |
| "apparentTemp": safe_number(row.get("dmi_apparent_temperature_pred")), | |
| "dmiWindSpeed": dmi_wind_speed, | |
| "mlWindSpeed": ml_wind_speed, | |
| "effectiveWindSpeed": effective_wind_speed, | |
| "effectiveWindSpeedSource": effective_wind_speed_source, | |
| "dmiWindGust": dmi_wind_gust, | |
| "mlWindGust": ml_wind_gust, | |
| "effectiveWindGust": effective_wind_gust, | |
| "effectiveWindGustSource": effective_wind_gust_source, | |
| "windDirection": safe_number(row.get("dmi_winddirection_10m_pred")), | |
| "dmiRainProb": dmi_rain_prob, | |
| "mlRainProb": ml_rain_prob, | |
| "effectiveRainProb": effective_rain_prob or 0.0, | |
| "effectiveRainProbSource": effective_rain_prob_source, | |
| "dmiRainAmount": dmi_rain_amount, | |
| "mlRainAmount": ml_rain_amount, | |
| "effectiveRainAmount": effective_rain_amount or 0.0, | |
| "effectiveRainAmountSource": effective_rain_amount_source, | |
| "weatherCode": int(row.get("dmi_weather_code_pred")) if safe_number(row.get("dmi_weather_code_pred"), digits=0) is not None else None, | |
| "cloudCover": safe_number(row.get("dmi_cloud_cover_pred")), | |
| "humidity": safe_number(row.get("dmi_relative_humidity_2m_pred")), | |
| "pressure": safe_number(row.get("dmi_pressure_msl_pred")), | |
| } | |
| def build_current_payload(current_row): | |
| if not current_row: | |
| return { | |
| "timestamp": to_iso(now_cph()), | |
| "temp": None, | |
| "dmiTemp": None, | |
| "mlTemp": None, | |
| "tempSource": "dmi", | |
| "apparentTemp": None, | |
| "windSpeed": None, | |
| "dmiWindSpeed": None, | |
| "mlWindSpeed": None, | |
| "windSpeedSource": "dmi", | |
| "windGust": None, | |
| "dmiWindGust": None, | |
| "mlWindGust": None, | |
| "windGustSource": "dmi", | |
| "windDirection": None, | |
| "rainProb": 0.0, | |
| "dmiRainProb": 0.0, | |
| "mlRainProb": 0.0, | |
| "rainProbSource": "dmi", | |
| "rainAmount": 0.0, | |
| "dmiRainAmount": 0.0, | |
| "mlRainAmount": 0.0, | |
| "rainAmountSource": "dmi", | |
| "humidity": None, | |
| "pressure": None, | |
| "cloudCover": None, | |
| "weatherCode": None, | |
| } | |
| return { | |
| "timestamp": current_row["timestamp"], | |
| "temp": current_row["effectiveTemp"], | |
| "dmiTemp": current_row["dmiTemp"], | |
| "mlTemp": current_row["mlTemp"], | |
| "tempSource": current_row["effectiveTempSource"], | |
| "apparentTemp": current_row["apparentTemp"], | |
| "windSpeed": current_row["effectiveWindSpeed"], | |
| "dmiWindSpeed": current_row["dmiWindSpeed"], | |
| "mlWindSpeed": current_row["mlWindSpeed"], | |
| "windSpeedSource": current_row["effectiveWindSpeedSource"], | |
| "windGust": current_row["effectiveWindGust"], | |
| "dmiWindGust": current_row["dmiWindGust"], | |
| "mlWindGust": current_row["mlWindGust"], | |
| "windGustSource": current_row["effectiveWindGustSource"], | |
| "windDirection": current_row["windDirection"], | |
| "rainProb": current_row["effectiveRainProb"], | |
| "dmiRainProb": current_row["dmiRainProb"], | |
| "mlRainProb": current_row["mlRainProb"], | |
| "rainProbSource": current_row["effectiveRainProbSource"], | |
| "rainAmount": current_row["effectiveRainAmount"], | |
| "dmiRainAmount": current_row["dmiRainAmount"], | |
| "mlRainAmount": current_row["mlRainAmount"], | |
| "rainAmountSource": current_row["effectiveRainAmountSource"], | |
| "humidity": current_row["humidity"], | |
| "pressure": current_row["pressure"], | |
| "cloudCover": current_row["cloudCover"], | |
| "weatherCode": current_row["weatherCode"], | |
| } | |
| def build_alert_rows(forecast_rows): | |
| """Derive lightweight alert messages from the live forecast.""" | |
| alerts = [] | |
| if not forecast_rows: | |
| return [ | |
| { | |
| "type": "data", | |
| "severity": "warning", | |
| "title": "Ingen forecast-data", | |
| "message": "Collector kunne ikke bygge et forecast-snapshot.", | |
| } | |
| ] | |
| max_wind_row = max( | |
| forecast_rows, | |
| key=lambda row: max(row.get("effectiveWindSpeed") or 0.0, row.get("effectiveWindGust") or 0.0), | |
| ) | |
| max_rain_row = max( | |
| forecast_rows, | |
| key=lambda row: max(row.get("effectiveRainProb") or 0.0, row.get("effectiveRainAmount") or 0.0), | |
| ) | |
| max_wind_speed = max_wind_row.get("effectiveWindSpeed") or 0.0 | |
| max_wind_gust = max_wind_row.get("effectiveWindGust") or 0.0 | |
| max_rain_prob = max_rain_row.get("effectiveRainProb") or 0.0 | |
| max_rain_amount = max_rain_row.get("effectiveRainAmount") or 0.0 | |
| wind_source = ( | |
| "ML" | |
| if max_wind_row.get("effectiveWindSpeedSource") == "ml" or max_wind_row.get("effectiveWindGustSource") == "ml" | |
| else "DMI" | |
| ) | |
| rain_source = ( | |
| "ML" | |
| if max_rain_row.get("effectiveRainProbSource") == "ml" or max_rain_row.get("effectiveRainAmountSource") == "ml" | |
| else "DMI" | |
| ) | |
| if max_wind_speed >= 15 or max_wind_gust >= 20: | |
| alerts.append( | |
| { | |
| "type": "wind", | |
| "severity": "warning", | |
| "title": "Kraftig vind", | |
| "message": f"{wind_source} forventer op til {max_wind_speed:.1f} m/s og vindstรธd op til {max_wind_gust:.1f} m/s i forecast-vinduet.", | |
| } | |
| ) | |
| if max_rain_prob >= 70 or max_rain_amount >= 5: | |
| alerts.append( | |
| { | |
| "type": "rain", | |
| "severity": "warning", | |
| "title": "Vรฅd periode", | |
| "message": f"{rain_source} forventer op til {max_rain_prob:.0f}% regnrisiko og {max_rain_amount:.1f} mm/time i forecast-vinduet.", | |
| } | |
| ) | |
| if not alerts: | |
| alerts.append( | |
| { | |
| "type": "data", | |
| "severity": "info", | |
| "title": "Roligt forecast", | |
| "message": "Snapshot viser ingen kraftige regn- eller vindsignaler lige nu.", | |
| } | |
| ) | |
| return alerts | |
| def build_frontend_snapshot(): | |
| """Build the JSON contract consumed by the Vercel frontend.""" | |
| predictions_df = load_predictions_snapshot() | |
| training_df, _ = load_existing_training_matrix() | |
| registry = load_json_from_dataset("model_registry.json", DATASET_NAME) or {} | |
| model_meta = load_json_from_dataset("model_meta.json", DATASET_NAME) or {} | |
| registry_revision = registry.get("generated_at") | |
| if registry_revision: | |
| clear_model_bundle_cache(registry_revision) | |
| predictions_df = rebuild_future_ml_columns(predictions_df, registry_revision=registry_revision) | |
| target_status = build_target_status(registry) | |
| backtest_df = build_recent_backtest(training_df) | |
| verification = calculate_verification_metrics(predictions_df=predictions_df, backtest_df=backtest_df) | |
| history = build_history_payload(predictions_df=predictions_df, backtest_df=backtest_df) | |
| future_df = None | |
| if predictions_df is not None and len(predictions_df) > 0: | |
| future_df = predictions_df[predictions_df["target_timestamp"] > now_cph()].copy() | |
| future_df = future_df.sort_values("target_timestamp").head(48).reset_index(drop=True) | |
| forecast_rows = [] | |
| if future_df is not None and len(future_df) > 0: | |
| for _, row in future_df.iterrows(): | |
| forecast_rows.append(build_forecast_row(row, target_status)) | |
| current_row = forecast_rows[0] if forecast_rows else None | |
| current = build_current_payload(current_row) | |
| feature_importance = [] | |
| for target_name in MODEL_FILES: | |
| feature_importance.extend( | |
| extract_feature_importance_from_bundle( | |
| load_model_bundle(target_name, cache_revision=registry_revision), | |
| target_name, | |
| ) | |
| ) | |
| feature_importance.sort(key=lambda item: item["importance"], reverse=True) | |
| snapshot = { | |
| "location": { | |
| "name": "Aarhus", | |
| "timezone": "Europe/Copenhagen", | |
| }, | |
| "generatedAt": to_iso(now_cph()), | |
| "targetLabels": build_target_labels(), | |
| "explanations": build_explanations(), | |
| "targetStatus": target_status, | |
| "current": current, | |
| "forecast": forecast_rows, | |
| "history": history, | |
| "verification": verification, | |
| "leadBuckets": build_lead_bucket_rows(registry), | |
| "featureImportance": feature_importance[:24], | |
| "modelInfo": { | |
| "trainedAt": model_meta.get("trained_at"), | |
| "trainingSamples": model_meta.get("n_samples"), | |
| "targets": model_meta.get("targets", list(MODEL_FILES.keys())), | |
| "registryGeneratedAt": registry.get("generated_at"), | |
| }, | |
| "alerts": build_alert_rows(forecast_rows), | |
| } | |
| return snapshot | |
| def publish_frontend_snapshot(): | |
| """Write and upload the frontend snapshot JSON to the predictions dataset.""" | |
| try: | |
| snapshot = build_frontend_snapshot() | |
| with open(FRONTEND_SNAPSHOT_FILE, "w", encoding="utf-8") as handle: | |
| json.dump(snapshot, handle, indent=2, ensure_ascii=False) | |
| success = upload_to_dataset(FRONTEND_SNAPSHOT_FILE, FRONTEND_SNAPSHOT_FILE, PREDICTIONS_DATASET) | |
| if success: | |
| return True, f"Uploaded {FRONTEND_SNAPSHOT_FILE}" | |
| return False, f"Failed to upload {FRONTEND_SNAPSHOT_FILE}" | |
| except Exception as exc: | |
| log_exception("publish_frontend_snapshot", exc) | |
| return False, str(exc) | |
| # ============================================================================= | |
| # BACKFILL OPERATIONS | |
| # ============================================================================= | |
| def backfill_historical_data(): | |
| """Backfill historical data from the agreed historical start date to now.""" | |
| log_event("backfill_historical_data entered") | |
| init_dataset_if_needed() | |
| start_date = HISTORICAL_BACKFILL_START | |
| end_date = now_cph().date() | |
| print(f"๐ Fetching from {start_date} to {end_date}") | |
| all_data = [] | |
| current_month_start = start_date | |
| while current_month_start <= end_date: | |
| if current_month_start.month == 12: | |
| next_month = datetime(current_month_start.year + 1, 1, 1).date() | |
| else: | |
| next_month = datetime(current_month_start.year, current_month_start.month + 1, 1).date() | |
| month_end = min(next_month - timedelta(days=1), end_date) | |
| print(f"๐ Fetching {current_month_start.strftime('%Y-%m')}...") | |
| forecasts = fetch_forecasts_for_period(current_month_start, month_end) | |
| if forecasts is not None and len(forecasts) > 0: | |
| min_target = forecasts['target_timestamp'].min().date() | |
| max_target = forecasts['target_timestamp'].max().date() | |
| observations = fetch_observations_for_period( | |
| min_target - timedelta(days=2), | |
| max_target + timedelta(days=2) | |
| ) | |
| if observations is not None: | |
| merged = build_training_matrix(forecasts, observations) | |
| if merged is not None and len(merged) > 0: | |
| all_data.append(merged) | |
| print(f"โ {len(merged)} rows") | |
| current_month_start = next_month | |
| if not all_data: | |
| return "โ No data collected" | |
| final_df = pd.concat(all_data, ignore_index=True) | |
| final_df = dedupe_rows(final_df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") | |
| # Save to parquet | |
| final_df.to_parquet("training_matrix.parquet") | |
| # Upload | |
| if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME): | |
| return f"โ Backfilled {len(final_df)} rows to training_matrix.parquet" | |
| else: | |
| return "โ Failed to upload" | |
| # ============================================================================= | |
| # DAILY UPDATE | |
| # ============================================================================= | |
| def update_daily(): | |
| """Daily update - fetch last 7 days and append to dataset.""" | |
| log_event("update_daily entered") | |
| init_dataset_if_needed() | |
| end_date = now_cph().date() | |
| start_date = end_date - timedelta(days=7) | |
| print(f"โฐ Copenhagen time: {now_cph()}") | |
| forecasts = fetch_forecasts_for_period(start_date, end_date) | |
| if forecasts is None: | |
| return "โ No forecasts fetched" | |
| min_target = forecasts['target_timestamp'].min().date() | |
| max_target = forecasts['target_timestamp'].max().date() | |
| observations = fetch_observations_for_period(min_target - timedelta(days=2), max_target) | |
| if observations is None: | |
| return "โ No observations fetched" | |
| merged = build_training_matrix(forecasts, observations) | |
| if merged is None or len(merged) == 0: | |
| return "โ No matching data" | |
| # Try to load existing data | |
| try: | |
| existing, existing_name = load_existing_training_matrix() | |
| if existing is not None and not existing.empty and "target_timestamp" in existing.columns: | |
| new_data = find_new_rows(merged, existing, TRAINING_DEDUP_KEYS) | |
| if len(new_data) == 0: | |
| return f"โน๏ธ No new data ({existing_name} already up to date)" | |
| combined = pd.concat([existing, new_data], ignore_index=True) | |
| combined = dedupe_rows(combined, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") | |
| status_msg = f"โ Added {len(new_data)} new rows" | |
| else: | |
| combined = merged | |
| status_msg = f"โ Created new dataset with {len(merged)} rows" | |
| except Exception as e: | |
| print(f"Info: {e}") | |
| combined = merged | |
| status_msg = f"โ Created dataset with {len(merged)} rows" | |
| combined.to_parquet("training_matrix.parquet") | |
| if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME): | |
| snapshot_ok, snapshot_msg = publish_frontend_snapshot() | |
| if snapshot_ok: | |
| return f"{status_msg}\nSnapshot: {snapshot_msg}" | |
| return f"{status_msg}\nSnapshot warning: {snapshot_msg}" | |
| else: | |
| return "โ Failed to upload" | |
| # ============================================================================= | |
| # PREDICTIONS | |
| # ============================================================================= | |
| def load_model_bundle(target_name, cache_revision=None): | |
| """Load model bundle from dataset, caching by registry revision.""" | |
| log_event("load_model_bundle", target_name=target_name, cache_revision=cache_revision) | |
| with APP_STATE.lock: | |
| if cache_revision and APP_STATE.cache_revision != cache_revision: | |
| APP_STATE.model_bundle_cache = {} | |
| APP_STATE.cache_revision = cache_revision | |
| cached_bundle = APP_STATE.model_bundle_cache.get(target_name) | |
| if cached_bundle is not None: | |
| return cached_bundle | |
| try: | |
| path = hf_hub_download( | |
| repo_id=DATASET_NAME, | |
| filename=f"{target_name}_models.pkl", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| bundle = joblib.load(path) | |
| with APP_STATE.lock: | |
| APP_STATE.model_bundle_cache[target_name] = bundle | |
| return bundle | |
| except Exception as e: | |
| log_exception(f"load_model_bundle[{target_name}]", e) | |
| return None | |
| def predict_with_bundle(bundle, df): | |
| """Make predictions using model bundle.""" | |
| if bundle is None or 'models' not in bundle: | |
| return None | |
| predictions = np.full(len(df), np.nan) | |
| for bucket in df['lead_bucket'].unique(): | |
| if bucket in bundle['models']: | |
| bucket_mask = df['lead_bucket'] == bucket | |
| bucket_df = df[bucket_mask] | |
| model_info = bundle['models'][bucket] | |
| model = model_info['model'] | |
| feature_cols = model_info.get('feature_columns', []) | |
| if feature_cols: | |
| missing_cols = [col for col in feature_cols if col not in bucket_df.columns] | |
| if missing_cols: | |
| log_event("predict_with_bundle missing_features", bucket=bucket, missing_columns=missing_cols) | |
| continue | |
| X = bucket_df[feature_cols].fillna(0.0) | |
| if hasattr(model, "predict_proba"): | |
| bucket_pred = model.predict_proba(X)[:, 1] | |
| else: | |
| bucket_pred = model.predict(X) | |
| predictions[bucket_mask] = bucket_pred | |
| return predictions | |
| # ============================================================================= | |
| # STARTUP CATCH-UP | |
| # ============================================================================= | |
| def generate_predictions(): | |
| """Generate predictions for all targets and preserve verified history.""" | |
| log_event("generate_predictions entered") | |
| current_time = now_cph() | |
| log_event("generate_predictions clock", current_time=str(current_time)) | |
| future_forecasts = fetch_future_forecasts() | |
| if future_forecasts is None or len(future_forecasts) == 0: | |
| return "Could not fetch future forecasts" | |
| registry_revision = None | |
| try: | |
| registry_path = hf_hub_download( | |
| repo_id=DATASET_NAME, | |
| filename="model_registry.json", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(registry_path, "r") as handle: | |
| registry = json.load(handle) | |
| registry_revision = registry.get("generated_at") | |
| if registry_revision: | |
| clear_model_bundle_cache(registry_revision) | |
| except Exception as exc: | |
| log_exception("generate_predictions model_registry", exc) | |
| feature_frame = build_live_feature_frame(future_forecasts) | |
| results = feature_frame.copy() | |
| results["prediction_made_at"] = current_time | |
| results["city"] = "aarhus" | |
| results["verified"] = False | |
| results["actual_temp"] = None | |
| results["actual_wind_speed"] = None | |
| results["actual_wind_gust"] = None | |
| results["actual_precipitation"] = None | |
| results["actual_rain"] = None | |
| results["actual_rain_event"] = None | |
| results["actual_rain_amount"] = None | |
| results["ml_temp"] = results["dmi_temperature_2m_pred"] | |
| results["ml_wind_speed"] = results["dmi_windspeed_10m_pred"] | |
| results["ml_wind_gust"] = results["dmi_windgusts_10m_pred"] | |
| results["ml_rain_prob"] = results["dmi_precipitation_probability_pred"].fillna(0.0).clip(0.0, 100.0) / 100.0 | |
| results["ml_rain_amount"] = results["dmi_precipitation_pred"].fillna(0.0).clip(0.0, None) | |
| target_specs = [ | |
| ("temperature", "ml_temp", "dmi_temperature_2m_pred", True), | |
| ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True), | |
| ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True), | |
| ("rain_event", "ml_rain_prob", None, False), | |
| ("rain_amount", "ml_rain_amount", None, False), | |
| ] | |
| for target_name, output_col, baseline_col, is_correction in target_specs: | |
| bundle = load_model_bundle(target_name, cache_revision=registry_revision) | |
| target_pred = predict_with_bundle(bundle, feature_frame) | |
| if target_pred is None: | |
| continue | |
| target_series = pd.Series(target_pred, index=results.index, dtype="float64") | |
| target_mask = target_series.notna() | |
| if not target_mask.any(): | |
| continue | |
| if is_correction: | |
| results.loc[target_mask, output_col] = results.loc[target_mask, baseline_col] + target_series[target_mask] | |
| else: | |
| results.loc[target_mask, output_col] = target_series[target_mask] | |
| results["ml_rain_prob"] = results["ml_rain_prob"].clip(0.0, 1.0) | |
| results["ml_rain_amount"] = results["ml_rain_amount"].clip(0.0, None) | |
| results = merge_prediction_history(load_predictions_snapshot(), results) | |
| results.to_parquet("predictions_latest.parquet") | |
| if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET): | |
| future_count = int((results["target_timestamp"] > current_time).sum()) | |
| verified_count = int(results["verified"].fillna(False).astype(bool).sum()) | |
| status_msg = ( | |
| f"Generated/upserted {len(feature_frame)} future predictions. " | |
| f"Dataset now holds {len(results)} rows, including {future_count} future rows " | |
| f"and {verified_count} verified rows." | |
| ) | |
| snapshot_ok, snapshot_msg = publish_frontend_snapshot() | |
| if snapshot_ok: | |
| return f"{status_msg}\nSnapshot: {snapshot_msg}" | |
| return f"{status_msg}\nSnapshot warning: {snapshot_msg}" | |
| return "Failed to upload predictions" | |
| def verify_predictions(): | |
| """Verify past predictions with actual observations.""" | |
| log_event("verify_predictions entered") | |
| try: | |
| pred_df = load_predictions_snapshot() | |
| if pred_df is None or len(pred_df) == 0: | |
| return "No predictions file found" | |
| now = now_cph() | |
| to_verify = pred_df[ | |
| (~pred_df["verified"]) & | |
| (pred_df["target_timestamp"] < now - timedelta(hours=1)) | |
| ] | |
| if len(to_verify) == 0: | |
| return "No predictions to verify" | |
| start_date = to_verify["target_timestamp"].min().date() | |
| end_date = to_verify["target_timestamp"].max().date() | |
| observations = fetch_observations_for_period(start_date, end_date) | |
| if observations is None or len(observations) == 0: | |
| return "Could not fetch observations" | |
| observation_cols = [ | |
| "actual_temp", | |
| "actual_wind_speed", | |
| "actual_wind_gust", | |
| "actual_precipitation", | |
| "actual_rain", | |
| "rain_event", | |
| "rain_amount", | |
| ] | |
| lookup = observations.set_index("target_timestamp")[observation_cols] | |
| verified_count = 0 | |
| for idx, row in to_verify.iterrows(): | |
| target_timestamp = row["target_timestamp"] | |
| if target_timestamp not in lookup.index: | |
| continue | |
| match = lookup.loc[target_timestamp] | |
| pred_df.loc[idx, "actual_temp"] = match["actual_temp"] | |
| pred_df.loc[idx, "actual_wind_speed"] = match["actual_wind_speed"] | |
| pred_df.loc[idx, "actual_wind_gust"] = match["actual_wind_gust"] | |
| pred_df.loc[idx, "actual_precipitation"] = match["actual_precipitation"] | |
| pred_df.loc[idx, "actual_rain"] = match["actual_rain"] | |
| pred_df.loc[idx, "actual_rain_event"] = match["rain_event"] | |
| pred_df.loc[idx, "actual_rain_amount"] = match["rain_amount"] | |
| pred_df.loc[idx, "verified"] = True | |
| verified_count += 1 | |
| pred_df = normalize_prediction_df(pred_df) | |
| pred_df.to_parquet("predictions_latest.parquet") | |
| if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET): | |
| status_msg = f"Verified {verified_count} predictions" | |
| snapshot_ok, snapshot_msg = publish_frontend_snapshot() | |
| if snapshot_ok: | |
| return f"{status_msg}\nSnapshot: {snapshot_msg}" | |
| return f"{status_msg}\nSnapshot warning: {snapshot_msg}" | |
| return "Failed to upload verified predictions" | |
| except Exception as exc: | |
| log_exception("verify_predictions", exc) | |
| return f"Verification error: {exc}" | |
| def load_predictions_snapshot(): | |
| pred_path, _ = load_first_available_dataset_file( | |
| ["predictions_latest.parquet", "predictions.parquet"], | |
| PREDICTIONS_DATASET, | |
| ) | |
| if not pred_path: | |
| return None | |
| pred_df = normalize_prediction_df(pd.read_parquet(pred_path)) | |
| if pred_df is None or 'target_timestamp' not in pred_df.columns: | |
| return None | |
| return pred_df | |
| def build_collector_snapshot_text(): | |
| """Summarize stored training and prediction data for the collector landing view.""" | |
| lines = [] | |
| try: | |
| training_df, training_name = load_existing_training_matrix() | |
| if training_df is None or len(training_df) == 0: | |
| lines.append("Training data: no rows available.") | |
| else: | |
| latest_training = training_df["target_timestamp"].max() | |
| lines.append(f"Training data ({training_name}): {len(training_df)} rows through {latest_training}.") | |
| if "lead_bucket" in training_df.columns: | |
| bucket_counts = training_df["lead_bucket"].value_counts().sort_index().to_dict() | |
| bucket_text = ", ".join(f"{bucket}={count}" for bucket, count in bucket_counts.items()) | |
| lines.append(f"Lead buckets: {bucket_text}") | |
| except Exception as exc: | |
| lines.append(f"Training data summary unavailable: {exc}") | |
| try: | |
| pred_df = load_predictions_snapshot() | |
| if pred_df is None or len(pred_df) == 0: | |
| lines.append("Predictions: no rows available.") | |
| else: | |
| now = now_cph() | |
| future_count = int((pred_df["target_timestamp"] > now).sum()) | |
| verified_count = int(pred_df["verified"].fillna(False).astype(bool).sum()) | |
| latest_prediction = pred_df["prediction_made_at"].max() if "prediction_made_at" in pred_df.columns else "unknown" | |
| lines.append(f"Predictions: {len(pred_df)} rows, {future_count} future, {verified_count} verified.") | |
| lines.append(f"Latest prediction made at: {latest_prediction}") | |
| except Exception as exc: | |
| lines.append(f"Prediction summary unavailable: {exc}") | |
| return "\n".join(lines) | |
| def build_collector_load_outputs(): | |
| return build_app_status_text(), build_collector_snapshot_text() | |
| def build_action_status(result): | |
| return f"{result}\n\n{build_collector_snapshot_text()}" | |
| def should_run_daily_catch_up(): | |
| matrix_path = load_from_dataset("training_matrix.parquet", DATASET_NAME) | |
| if not matrix_path: | |
| matrix_path = load_from_dataset("data.parquet", DATASET_NAME) | |
| if not matrix_path: | |
| return True | |
| df = pd.read_parquet(matrix_path) | |
| timestamp_col = "target_timestamp" if "target_timestamp" in df.columns else "timestamp" if "timestamp" in df.columns else None | |
| if timestamp_col is None: | |
| return True | |
| latest_timestamp = pd.to_datetime(df[timestamp_col], errors="coerce").max() | |
| if pd.isna(latest_timestamp): | |
| return True | |
| return latest_timestamp.date() < now_cph().date() | |
| def should_generate_predictions(): | |
| pred_df = load_predictions_snapshot() | |
| if pred_df is None or pred_df.empty: | |
| return True | |
| future = pred_df[pred_df["target_timestamp"] > now_cph()] | |
| if future.empty: | |
| return True | |
| return future["target_timestamp"].max() < now_cph() + timedelta(hours=6) | |
| def should_verify_predictions(): | |
| pred_df = load_predictions_snapshot() | |
| if pred_df is None or pred_df.empty: | |
| return False | |
| if 'verified' not in pred_df.columns: | |
| pred_df['verified'] = False | |
| overdue = pred_df[ | |
| (~pred_df['verified']) & | |
| (pred_df['target_timestamp'] < now_cph() - timedelta(hours=1)) | |
| ] | |
| return len(overdue) > 0 | |
| def run_post_start_catch_up(): | |
| time.sleep(20) | |
| with APP_STATE.lock: | |
| if APP_STATE.catch_up_started: | |
| return | |
| APP_STATE.catch_up_started = True | |
| APP_STATE.active_job = True | |
| actions = [] | |
| try: | |
| if should_run_daily_catch_up(): | |
| actions.append(run_logged("startup_catch_up_daily", update_daily)) | |
| if should_generate_predictions(): | |
| actions.append(run_logged("startup_catch_up_generate_predictions", generate_predictions)) | |
| if should_verify_predictions(): | |
| actions.append(run_logged("startup_catch_up_verify_predictions", verify_predictions)) | |
| log_event("startup_catch_up_completed", action_count=len(actions)) | |
| except Exception as exc: | |
| note_app_error(exc) | |
| log_exception("run_post_start_catch_up", exc) | |
| finally: | |
| with APP_STATE.lock: | |
| APP_STATE.active_job = False | |
| APP_STATE.warming = False | |
| APP_STATE.ready = True | |
| APP_STATE.catch_up_completed = True | |
| # ============================================================================= | |
| # SCHEDULER | |
| # ============================================================================= | |
| def run_scheduler(): | |
| """Background scheduler for automated tasks.""" | |
| log_event("scheduler starting") | |
| for run_time in ["00:35", "03:35", "06:35", "09:35", "12:35", "15:35", "18:35", "21:35"]: | |
| schedule.every().day.at(run_time).do(lambda: run_logged("scheduled_generate_predictions", generate_predictions)) | |
| schedule.every().hour.at(":12").do(lambda: run_logged("scheduled_verify_predictions", verify_predictions)) | |
| schedule.every().day.at("05:45").do(lambda: run_logged("scheduled_update_daily", update_daily)) | |
| log_event("scheduler_registered") | |
| while True: | |
| try: | |
| schedule.run_pending() | |
| except Exception as exc: | |
| log_exception("scheduler.run_pending", exc) | |
| time.sleep(60) | |
| # ============================================================================= | |
| # GRADIO UI | |
| # ============================================================================= | |
| log_event("building_gradio_ui") | |
| with gr.Blocks(title="DMI Aarhus Collector") as demo: | |
| gr.Markdown(""" | |
| # ๐ค๏ธ DMI Aarhus Weather Collector | |
| Collects forecast and observation data for Aarhus, generates predictions, and verifies accuracy. | |
| """) | |
| app_status = gr.Markdown(build_app_status_text()) | |
| status = gr.Textbox(label="Status", lines=10, value="Loading collector snapshot...") | |
| with gr.Row(): | |
| btn_backfill = gr.Button("๐ Backfill Historical Data", variant="primary") | |
| btn_daily = gr.Button("๐ Daily Update", variant="secondary") | |
| btn_predict = gr.Button("๐ฎ Generate Predictions", variant="primary") | |
| btn_verify = gr.Button("โ Verify Predictions", variant="secondary") | |
| def backfill_handler(): | |
| try: | |
| result = run_logged("gradio_backfill_historical_data", backfill_historical_data) | |
| set_app_ready() | |
| return build_app_status_text(), build_action_status(result) | |
| except Exception as exc: | |
| note_app_error(exc) | |
| return build_app_status_text(), f"โ backfill_historical_data failed: {exc}" | |
| def daily_handler(): | |
| try: | |
| result = run_logged("gradio_update_daily", update_daily) | |
| set_app_ready() | |
| return build_app_status_text(), build_action_status(result) | |
| except Exception as exc: | |
| note_app_error(exc) | |
| return build_app_status_text(), f"โ update_daily failed: {exc}" | |
| def predict_handler(): | |
| try: | |
| result = run_logged("gradio_generate_predictions", generate_predictions) | |
| set_app_ready() | |
| return build_app_status_text(), build_action_status(result) | |
| except Exception as exc: | |
| note_app_error(exc) | |
| return build_app_status_text(), f"โ generate_predictions failed: {exc}" | |
| def verify_handler(): | |
| try: | |
| result = run_logged("gradio_verify_predictions", verify_predictions) | |
| set_app_ready() | |
| return build_app_status_text(), build_action_status(result) | |
| except Exception as exc: | |
| note_app_error(exc) | |
| return build_app_status_text(), f"โ verify_predictions failed: {exc}" | |
| btn_backfill.click(backfill_handler, outputs=[app_status, status]) | |
| btn_daily.click(daily_handler, outputs=[app_status, status]) | |
| btn_predict.click(predict_handler, outputs=[app_status, status]) | |
| btn_verify.click(verify_handler, outputs=[app_status, status]) | |
| demo.load(build_collector_load_outputs, outputs=[app_status, status]) | |
| log_event("gradio_ui_ready") | |
| log_event("ui_constructed") | |
| if __name__ == "__main__": | |
| log_startup() | |
| try: | |
| set_app_ready() | |
| scheduler_thread = threading.Thread(target=run_scheduler, daemon=True, name="collector-scheduler") | |
| scheduler_thread.start() | |
| catch_up_thread = threading.Thread(target=run_post_start_catch_up, daemon=True, name="collector-startup-catch-up") | |
| catch_up_thread.start() | |
| log_event("scheduler_thread_started", thread_name=scheduler_thread.name, is_alive=scheduler_thread.is_alive()) | |
| log_event("catch_up_thread_started", thread_name=catch_up_thread.name, is_alive=catch_up_thread.is_alive()) | |
| log_event("gradio_launch_called", server_name="0.0.0.0", server_port=7860, ssr_mode=False) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| ssr_mode=False | |
| ) | |
| except Exception as exc: | |
| log_exception("__main__", exc) | |
| raise | |