import importlib import json import platform import sys import traceback import faulthandler from dataclasses import dataclass, field from datetime import datetime, timedelta print("[dmi-collector][bootstrap] importing third_party_modules", flush=True) import gradio as gr import requests from huggingface_hub import HfApi, hf_hub_download import schedule import time import threading import os from zoneinfo import ZoneInfo print("[dmi-collector][bootstrap] third_party_modules_imported", flush=True) # ============================================================================= # CONFIGURATION # ============================================================================= DATASET_NAME = "Ciroc0/dmi-aarhus-weather-data" PREDICTIONS_DATASET = "Ciroc0/dmi-aarhus-predictions" AARHUS_LAT = 56.1567 AARHUS_LON = 10.2108 HF_TOKEN = os.environ.get("HF_TOKEN") FRONTEND_SNAPSHOT_FILE = "frontend_snapshot.json" COPENHAGEN_TZ = ZoneInfo("Europe/Copenhagen") APP_NAME = "dmi-collector" HISTORICAL_BACKFILL_START = datetime(2025, 11, 1).date() TRAINING_HOLDOUT_DAYS = 7 FUTURE_FORECAST_HOURS = 48 faulthandler.enable() MODEL_FILES = { "temperature": "temperature_models.pkl", "wind_speed": "wind_speed_models.pkl", "wind_gust": "wind_gust_models.pkl", "rain_event": "rain_event_models.pkl", "rain_amount": "rain_amount_models.pkl", } # Extended feature set from PLAN.md FORECAST_FEATURES = [ "temperature_2m", "apparent_temperature", "relative_humidity_2m", "dew_point_2m", "pressure_msl", "cloud_cover", "cloud_cover_low", "cloud_cover_mid", "cloud_cover_high", "precipitation", "rain", "snowfall", "precipitation_probability", "windspeed_10m", "winddirection_10m", "windgusts_10m", "visibility", "shortwave_radiation", "direct_radiation", "weather_code", "cape", ] OBSERVATION_FEATURES = [ "temperature_2m", "relative_humidity_2m", "pressure_msl", "precipitation", "rain", "windspeed_10m", "winddirection_10m", "windgusts_10m", ] TRAINING_DEDUP_KEYS = ["reference_time", "target_timestamp"] PREDICTION_DEDUP_KEYS = ["target_timestamp"] OBSERVATION_CONTEXT_TIMESTAMP_COL = "observation_context_timestamp" OBSERVATION_SOURCE_COLUMNS = [ "actual_temp", "actual_humidity", "actual_pressure", "actual_precipitation", "actual_rain", "actual_wind_speed", "actual_wind_direction", "actual_wind_gust", "actual_wind_u", "actual_wind_v", "rain_event", "rain_amount", ] OBSERVATION_CONTEXT_COLUMNS = [ "obs_temp_lag_1h", "obs_temp_mean_3h", "obs_temp_mean_6h", "obs_wind_lag_1h", "obs_wind_mean_3h", "obs_wind_mean_6h", "obs_wind_u_lag_1h", "obs_wind_u_mean_3h", "obs_wind_v_lag_1h", "obs_wind_v_mean_3h", "obs_pressure_lag_1h", "obs_pressure_mean_3h", "obs_humidity_lag_1h", "obs_humidity_mean_3h", "obs_precip_lag_1h", "obs_precip_sum_3h", "obs_precip_sum_6h", "obs_precip_sum_12h", ] class LazyModule: def __init__(self, module_name): self.module_name = module_name self._module = None def _load(self): if self._module is None: self._module = importlib.import_module(self.module_name) return self._module def __getattr__(self, item): return getattr(self._load(), item) pd = LazyModule("pandas") np = LazyModule("numpy") joblib = LazyModule("joblib") @dataclass class AppState: lock: threading.Lock = field(default_factory=threading.Lock) warming: bool = True ready: bool = False last_error: str | None = None cache_revision: str | None = None active_job: bool = False model_bundle_cache: dict = field(default_factory=dict) catch_up_started: bool = False catch_up_completed: bool = False APP_STATE = AppState() def log_event(message, **fields): """Emit a structured log line that shows up clearly in HF runtime logs.""" timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z" details = " ".join(f"{key}={fields[key]!r}" for key in sorted(fields)) if details: print(f"[{APP_NAME}] {timestamp} {message} {details}", flush=True) else: print(f"[{APP_NAME}] {timestamp} {message}", flush=True) def log_exception(context, exc): """Log an exception with a full traceback.""" log_event(f"{context} failed", error=str(exc), error_type=type(exc).__name__) print(traceback.format_exc(), flush=True) def install_global_logging(): """Install process-level exception logging.""" def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return log_event("uncaught_exception", error=str(exc_value), error_type=exc_type.__name__) print("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)), flush=True) sys.excepthook = handle_exception def log_startup(): """Log runtime context during process startup.""" log_event( "startup", python=sys.version.split()[0], platform=platform.platform(), cwd=os.getcwd(), hf_token_present=bool(HF_TOKEN), dataset=DATASET_NAME, predictions_dataset=PREDICTIONS_DATASET, ) def run_logged(name, fn, *args, **kwargs): """Run a function with start/end/error logging.""" log_event(f"{name} started") try: result = fn(*args, **kwargs) if isinstance(result, pd.DataFrame): log_event(f"{name} completed", rows=len(result), columns=list(result.columns)) else: log_event(f"{name} completed", result_type=type(result).__name__) return result except Exception as exc: log_exception(name, exc) raise install_global_logging() log_event("bootstrap_begin") def build_app_status_text(): with APP_STATE.lock: if APP_STATE.last_error: return f"Status: failed. {APP_STATE.last_error}" if APP_STATE.warming and APP_STATE.active_job: return "Status: warming up. Post-start catch-up is running in the background." if APP_STATE.warming: return "Status: warming up. The UI is live; startup catch-up will run in the background." if APP_STATE.active_job: return "Status: ready. Background maintenance is currently running." return "Status: ready. Forecast, daily update, prediction, and verification actions run on demand or by schedule." def note_app_error(exc): with APP_STATE.lock: APP_STATE.last_error = str(exc) def clear_model_bundle_cache(cache_revision=None): with APP_STATE.lock: if cache_revision is None or APP_STATE.cache_revision != cache_revision: APP_STATE.model_bundle_cache = {} APP_STATE.cache_revision = cache_revision def set_app_ready(): with APP_STATE.lock: APP_STATE.warming = False APP_STATE.ready = True APP_STATE.last_error = None def now_cph(): """Current time in Copenhagen timezone.""" return datetime.now(COPENHAGEN_TZ) def get_lead_bucket(lead_hours): """Map lead time to bucket.""" if lead_hours <= 6: return "1-6" elif lead_hours <= 12: return "7-12" elif lead_hours <= 24: return "13-24" else: return "25-48" def ensure_copenhagen_time(df, column_name): """Ensure a datetime column is timezone-aware in Europe/Copenhagen.""" if column_name not in df.columns: return df series = pd.to_datetime(df[column_name], errors="coerce") if getattr(series.dt, "tz", None) is None: df[column_name] = series.dt.tz_localize(COPENHAGEN_TZ, ambiguous="infer", nonexistent="shift_forward") else: df[column_name] = series.dt.tz_convert(COPENHAGEN_TZ) return df def dedupe_rows(df, dedup_keys, sort_keys=None, keep="last"): """Sort and drop duplicate rows using the keys available on the dataframe.""" if df is None or len(df) == 0: return df available_sort_keys = [key for key in (sort_keys or dedup_keys) if key in df.columns] if available_sort_keys: df = df.sort_values(available_sort_keys).reset_index(drop=True) available_dedup_keys = [key for key in dedup_keys if key in df.columns] if available_dedup_keys: df = df.drop_duplicates(subset=available_dedup_keys, keep=keep).reset_index(drop=True) return df def find_new_rows(candidate_df, existing_df, dedup_keys): """Return rows that are new compared with an existing dataframe.""" if existing_df is None or len(existing_df) == 0: return candidate_df keys = [key for key in dedup_keys if key in candidate_df.columns and key in existing_df.columns] if not keys: return candidate_df existing_keys = existing_df[keys].drop_duplicates().copy() existing_keys["_existing"] = True merged = candidate_df.merge(existing_keys, on=keys, how="left") merged = merged[merged["_existing"] != True].drop(columns=["_existing"]) return merged.reset_index(drop=True) def build_model_features(df): """Build the causal feature set used by both training and live inference.""" if df is None or len(df) == 0: return df features_df = df.copy() features_df = add_temporal_features(features_df) features_df = add_run_delta_features(features_df) return features_df def ensure_observation_context_columns(df): """Keep the observation-context schema stable even when context is unavailable.""" if df is None: return df if OBSERVATION_CONTEXT_TIMESTAMP_COL not in df.columns: df[OBSERVATION_CONTEXT_TIMESTAMP_COL] = pd.NaT for column_name in OBSERVATION_CONTEXT_COLUMNS: if column_name not in df.columns: df[column_name] = np.nan return df def get_optional_series(df, column_name): if column_name in df.columns: return df[column_name] return pd.Series(np.nan, index=df.index, dtype="float64") def datetime_series_to_epoch_ns(series): """Normalize mixed timestamp precisions to comparable epoch nanoseconds.""" normalized = pd.to_datetime(series, errors="coerce", utc=True) naive = normalized.dt.tz_localize(None) return naive.astype("datetime64[ns]").astype("int64", copy=False) def normalize_prediction_df(pred_df): """Normalize prediction history to the current schema.""" if pred_df is None or len(pred_df) == 0: return pred_df if "timestamp" in pred_df.columns and "target_timestamp" not in pred_df.columns: pred_df = pred_df.rename(columns={"timestamp": "target_timestamp"}) for column_name in ["target_timestamp", "reference_time", "prediction_made_at"]: pred_df = ensure_copenhagen_time(pred_df, column_name) if "target_timestamp" in pred_df.columns: pred_df = pred_df[pred_df["target_timestamp"].notna()].copy() if "verified" not in pred_df.columns: pred_df["verified"] = False pred_df["verified"] = pred_df["verified"].fillna(False).astype(bool) pred_df = dedupe_rows( pred_df, PREDICTION_DEDUP_KEYS, sort_keys=["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"], keep="last", ) return pred_df def merge_prediction_history(existing_df, new_df): """Upsert future rows while preserving historical prediction rows.""" if existing_df is None or len(existing_df) == 0: return normalize_prediction_df(new_df) if new_df is None or len(new_df) == 0: return normalize_prediction_df(existing_df) existing = normalize_prediction_df(existing_df).copy() incoming = normalize_prediction_df(new_df).copy() existing["_merge_priority"] = 0 incoming["_merge_priority"] = 1 combined = pd.concat([existing, incoming], ignore_index=True, sort=False) combined = normalize_prediction_df(combined) sort_keys = [ key for key in ["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"] if key in combined.columns ] if sort_keys: combined = combined.sort_values(sort_keys).reset_index(drop=True) if "target_timestamp" in combined.columns: combined = combined.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True) if "_merge_priority" in combined.columns: combined = combined.drop(columns=["_merge_priority"]) return combined # ============================================================================= # FORECAST FETCHING # ============================================================================= def fetch_forecasts_for_period(start_date, end_date): """ Fetch historical forecast runs for Aarhus from Open-Meteo. Returns DataFrame with all features. """ log_event("fetch_forecasts_for_period", start_date=str(start_date), end_date=str(end_date)) all_forecasts = [] run_hours = [0, 3, 6, 9, 12, 15, 18, 21] current_date = start_date cph_now = now_cph() while current_date <= end_date: for hour in run_hours: reference_time = datetime.combine(current_date, datetime.min.time()) + timedelta(hours=hour) reference_time = reference_time.replace(tzinfo=COPENHAGEN_TZ) if reference_time > cph_now: continue url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": AARHUS_LAT, "longitude": AARHUS_LON, "start_date": current_date.strftime("%Y-%m-%d"), "end_date": (current_date + timedelta(days=2)).strftime("%Y-%m-%d"), "models": "dmi_harmonie", "hourly": FORECAST_FEATURES, "timezone": "Europe/Copenhagen" } try: resp = requests.get(url, params=params, timeout=30) if resp.status_code != 200: del params['models'] resp = requests.get(url, params=params, timeout=30) if resp.status_code == 200: data = resp.json() if 'hourly' in data: times = pd.to_datetime(data['hourly']['time']) times = times.tz_localize('Europe/Copenhagen', ambiguous='infer') for i, target_time in enumerate(times): lead_hours = (target_time - reference_time).total_seconds() / 3600 if 0 < lead_hours <= 48: row = { 'target_timestamp': target_time, 'reference_time': reference_time, 'lead_time_hours': int(lead_hours), 'lead_bucket': get_lead_bucket(int(lead_hours)), 'latitude': AARHUS_LAT, 'longitude': AARHUS_LON, } # Add all forecast features with dmi_ prefix for feat in FORECAST_FEATURES: col_name = f"dmi_{feat}_pred" row[col_name] = data['hourly'].get(feat, [None] * len(times))[i] # Compute wind components wind_speed = row.get('dmi_windspeed_10m_pred', 0) or 0 wind_dir = row.get('dmi_winddirection_10m_pred', 0) or 0 row['forecast_wind_u'] = -wind_speed * np.sin(np.radians(wind_dir)) row['forecast_wind_v'] = -wind_speed * np.cos(np.radians(wind_dir)) all_forecasts.append(row) except Exception as e: print(f"Error fetching forecast for {current_date} hour {hour}: {e}") continue current_date += timedelta(days=1) time.sleep(0.1) if not all_forecasts: log_event("fetch_forecasts_for_period no_data", start_date=str(start_date), end_date=str(end_date)) return None df = pd.DataFrame(all_forecasts) df = dedupe_rows(df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") log_event("fetch_forecasts_for_period done", rows=len(df)) return df def fetch_future_forecasts(): """Fetch future forecasts - 48 hours ahead.""" log_event("fetch_future_forecasts started") now = now_cph() today = now.date() current_hour = now.hour run_hours = [0, 3, 6, 9, 12, 15, 18, 21] latest_run = max([h for h in run_hours if h <= current_hour], default=0) reference_time = datetime.combine(today, datetime.min.time()) + timedelta(hours=latest_run) reference_time = reference_time.replace(tzinfo=COPENHAGEN_TZ) url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": AARHUS_LAT, "longitude": AARHUS_LON, "start_date": today.strftime("%Y-%m-%d"), "end_date": (today + timedelta(days=3)).strftime("%Y-%m-%d"), "models": "dmi_harmonie", "hourly": FORECAST_FEATURES, "timezone": "Europe/Copenhagen" } try: resp = requests.get(url, params=params, timeout=30) if resp.status_code != 200: del params['models'] resp = requests.get(url, params=params, timeout=30) if resp.status_code != 200: return None data = resp.json() if 'hourly' not in data: return None times = pd.to_datetime(data['hourly']['time']) times = times.tz_localize('Europe/Copenhagen', ambiguous='infer') forecasts = [] for i, target_time in enumerate(times): if target_time > now: lead_hours = (target_time - reference_time).total_seconds() / 3600 if 0 < lead_hours <= 48: row = { 'target_timestamp': target_time, 'reference_time': reference_time, 'lead_time_hours': int(lead_hours), 'lead_bucket': get_lead_bucket(int(lead_hours)), } # Add all forecast features for feat in FORECAST_FEATURES: col_name = f"dmi_{feat}_pred" row[col_name] = data['hourly'].get(feat, [None] * len(times))[i] # Compute wind components wind_speed = row.get('dmi_windspeed_10m_pred', 0) or 0 wind_dir = row.get('dmi_winddirection_10m_pred', 0) or 0 row['forecast_wind_u'] = -wind_speed * np.sin(np.radians(wind_dir)) row['forecast_wind_v'] = -wind_speed * np.cos(np.radians(wind_dir)) forecasts.append(row) if not forecasts: log_event("fetch_future_forecasts no_data") return None df = pd.DataFrame(forecasts) df = df.drop_duplicates(subset=['target_timestamp'], keep='first') df = df.sort_values('target_timestamp').reset_index(drop=True) log_event("fetch_future_forecasts done", rows=len(df)) return df except Exception as e: log_exception("fetch_future_forecasts", e) return None # ============================================================================= # OBSERVATION FETCHING # ============================================================================= def fetch_observations_for_period(start_date, end_date): """ Fetch actual weather observations for Aarhus from Open-Meteo archive. """ log_event("fetch_observations_for_period", start_date=str(start_date), end_date=str(end_date)) url = "https://archive-api.open-meteo.com/v1/archive" cph_today = now_cph().date() if end_date > cph_today: end_date = cph_today params = { "latitude": AARHUS_LAT, "longitude": AARHUS_LON, "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), "hourly": OBSERVATION_FEATURES, "timezone": "Europe/Copenhagen" } try: resp = requests.get(url, params=params, timeout=60) if resp.status_code != 200: return None data = resp.json() if 'hourly' not in data: return None times = pd.to_datetime(data['hourly']['time']) times = times.tz_localize('Europe/Copenhagen', ambiguous='infer') df = pd.DataFrame({ 'target_timestamp': times, 'actual_temp': data['hourly'].get('temperature_2m'), 'actual_humidity': data['hourly'].get('relative_humidity_2m'), 'actual_pressure': data['hourly'].get('pressure_msl'), 'actual_precipitation': data['hourly'].get('precipitation'), 'actual_rain': data['hourly'].get('rain'), 'actual_wind_speed': data['hourly'].get('windspeed_10m'), 'actual_wind_direction': data['hourly'].get('winddirection_10m'), 'actual_wind_gust': data['hourly'].get('windgusts_10m'), }) # Derived targets df['actual_wind_u'] = -df['actual_wind_speed'] * np.sin(np.radians(df['actual_wind_direction'])) df['actual_wind_v'] = -df['actual_wind_speed'] * np.cos(np.radians(df['actual_wind_direction'])) df['rain_event'] = (df['actual_precipitation'] > 0.1).astype(int) df['rain_amount'] = df['actual_precipitation'] # Filter future times current_hour = now_cph().replace(minute=0, second=0, microsecond=0) df = df[df['target_timestamp'] <= current_hour] log_event("fetch_observations_for_period done", rows=len(df)) return df except Exception as e: log_exception("fetch_observations_for_period", e) return None # ============================================================================= # FEATURE ENGINEERING # ============================================================================= def add_cyclical_features(df, col, period): """Add sin/cos cyclical encoding for a time feature.""" df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period) df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period) return df def add_temporal_features(df): """Add temporal features to dataframe.""" df['hour'] = df['reference_time'].dt.hour df['month'] = df['reference_time'].dt.month df['day_of_year'] = df['reference_time'].dt.dayofyear df = add_cyclical_features(df, 'hour', 24) df = add_cyclical_features(df, 'month', 12) return df def add_run_delta_features(df): """Add features showing change between consecutive forecast runs.""" df = df.sort_values(['reference_time', 'target_timestamp']) # For each target timestamp, compute delta from previous run for feat in ['temperature_2m', 'windspeed_10m', 'windgusts_10m', 'precipitation', 'pressure_msl', 'relative_humidity_2m']: col = f'dmi_{feat}_pred' delta_col = f'dmi_{feat}_pred_run_delta' if col in df.columns: df[delta_col] = df.groupby('target_timestamp')[col].diff().fillna(0) return df def build_observation_context_frame(observations_df): """Build causal lag features from unique observation timestamps.""" columns = [OBSERVATION_CONTEXT_TIMESTAMP_COL, *OBSERVATION_CONTEXT_COLUMNS] if observations_df is None or len(observations_df) == 0: return pd.DataFrame(columns=columns) observations = observations_df.copy() observations = ensure_copenhagen_time(observations, "target_timestamp") observations = dedupe_rows(observations, ["target_timestamp"], sort_keys=["target_timestamp"], keep="last") observations = observations.sort_values("target_timestamp").reset_index(drop=True) if "actual_wind_u" not in observations.columns and {"actual_wind_speed", "actual_wind_direction"}.issubset(observations.columns): observations["actual_wind_u"] = -observations["actual_wind_speed"] * np.sin(np.radians(observations["actual_wind_direction"])) if "actual_wind_v" not in observations.columns and {"actual_wind_speed", "actual_wind_direction"}.issubset(observations.columns): observations["actual_wind_v"] = -observations["actual_wind_speed"] * np.cos(np.radians(observations["actual_wind_direction"])) if "rain_event" not in observations.columns and "actual_precipitation" in observations.columns: observations["rain_event"] = (observations["actual_precipitation"].fillna(0.0) > 0.1).astype(int) if "rain_amount" not in observations.columns and "actual_precipitation" in observations.columns: observations["rain_amount"] = observations["actual_precipitation"] context = pd.DataFrame({OBSERVATION_CONTEXT_TIMESTAMP_COL: observations["target_timestamp"]}) temp_series = get_optional_series(observations, "actual_temp").shift(1) wind_speed_series = get_optional_series(observations, "actual_wind_speed").shift(1) wind_u_series = get_optional_series(observations, "actual_wind_u").shift(1) wind_v_series = get_optional_series(observations, "actual_wind_v").shift(1) pressure_series = get_optional_series(observations, "actual_pressure").shift(1) humidity_series = get_optional_series(observations, "actual_humidity").shift(1) precipitation_series = get_optional_series(observations, "actual_precipitation").shift(1) context["obs_temp_lag_1h"] = temp_series context["obs_temp_mean_3h"] = temp_series.rolling(3, min_periods=1).mean() context["obs_temp_mean_6h"] = temp_series.rolling(6, min_periods=1).mean() context["obs_wind_lag_1h"] = wind_speed_series context["obs_wind_mean_3h"] = wind_speed_series.rolling(3, min_periods=1).mean() context["obs_wind_mean_6h"] = wind_speed_series.rolling(6, min_periods=1).mean() context["obs_wind_u_lag_1h"] = wind_u_series context["obs_wind_u_mean_3h"] = wind_u_series.rolling(3, min_periods=1).mean() context["obs_wind_v_lag_1h"] = wind_v_series context["obs_wind_v_mean_3h"] = wind_v_series.rolling(3, min_periods=1).mean() context["obs_pressure_lag_1h"] = pressure_series context["obs_pressure_mean_3h"] = pressure_series.rolling(3, min_periods=1).mean() context["obs_humidity_lag_1h"] = humidity_series context["obs_humidity_mean_3h"] = humidity_series.rolling(3, min_periods=1).mean() context["obs_precip_lag_1h"] = precipitation_series context["obs_precip_sum_3h"] = precipitation_series.rolling(3, min_periods=1).sum() context["obs_precip_sum_6h"] = precipitation_series.rolling(6, min_periods=1).sum() context["obs_precip_sum_12h"] = precipitation_series.rolling(12, min_periods=1).sum() context = ensure_observation_context_columns(context) return context.sort_values(OBSERVATION_CONTEXT_TIMESTAMP_COL).reset_index(drop=True) def attach_observation_context(df, observation_context_df): """Attach only observations that existed at the forecast reference time.""" if df is None or len(df) == 0: return ensure_observation_context_columns(df) enriched = df.copy() drop_cols = [OBSERVATION_CONTEXT_TIMESTAMP_COL, *OBSERVATION_CONTEXT_COLUMNS] existing_context_cols = [column_name for column_name in drop_cols if column_name in enriched.columns] if existing_context_cols: enriched = enriched.drop(columns=existing_context_cols) if "reference_time" not in enriched.columns: return ensure_observation_context_columns(enriched) enriched = ensure_copenhagen_time(enriched, "reference_time") enriched["_row_order"] = np.arange(len(enriched)) left = enriched.copy() left["_reference_time_key"] = datetime_series_to_epoch_ns(left["reference_time"]) left = left.sort_values("_reference_time_key").reset_index(drop=True) if observation_context_df is None or len(observation_context_df) == 0: left = ensure_observation_context_columns(left) left = left.sort_values("_row_order").drop(columns=["_row_order"]).reset_index(drop=True) return left if OBSERVATION_CONTEXT_TIMESTAMP_COL not in observation_context_df.columns: observation_context_df = build_observation_context_frame(observation_context_df) else: observation_context_df = ensure_observation_context_columns(observation_context_df.copy()) observation_context_df = ensure_copenhagen_time(observation_context_df, OBSERVATION_CONTEXT_TIMESTAMP_COL) observation_context_df = observation_context_df.dropna(subset=[OBSERVATION_CONTEXT_TIMESTAMP_COL]).copy() observation_context_df["_observation_context_key"] = datetime_series_to_epoch_ns( observation_context_df[OBSERVATION_CONTEXT_TIMESTAMP_COL] ) observation_context_df = observation_context_df.sort_values("_observation_context_key").reset_index(drop=True) merged = pd.merge_asof( left, observation_context_df[ [OBSERVATION_CONTEXT_TIMESTAMP_COL, "_observation_context_key", *OBSERVATION_CONTEXT_COLUMNS] ], left_on="_reference_time_key", right_on="_observation_context_key", direction="backward", ) merged = ensure_observation_context_columns(merged) merged = merged.sort_values("_row_order").drop(columns=["_row_order", "_reference_time_key", "_observation_context_key"]).reset_index(drop=True) return merged def build_live_feature_frame(forecasts_df): """Build live inference features including causal observation context when available.""" feature_frame = build_model_features(forecasts_df) if feature_frame is None or len(feature_frame) == 0: return ensure_observation_context_columns(feature_frame) if "reference_time" not in feature_frame.columns: return ensure_observation_context_columns(feature_frame) reference_series = pd.to_datetime(feature_frame["reference_time"], errors="coerce") min_reference_time = reference_series.min() max_reference_time = reference_series.max() if pd.isna(min_reference_time) or pd.isna(max_reference_time): return ensure_observation_context_columns(feature_frame) observation_start = (min_reference_time - timedelta(days=2)).date() observation_end = max_reference_time.date() observations = fetch_observations_for_period(observation_start, observation_end) if observations is None or len(observations) == 0: log_event("build_live_feature_frame missing_observations", start_date=str(observation_start), end_date=str(observation_end)) return ensure_observation_context_columns(feature_frame) observation_context = build_observation_context_frame(observations) return attach_observation_context(feature_frame, observation_context) def extract_observations_from_training_matrix(df): """Recover a unique observation frame from the stored training matrix.""" if df is None or len(df) == 0 or "target_timestamp" not in df.columns: return None source_columns = ["target_timestamp", *[column_name for column_name in OBSERVATION_SOURCE_COLUMNS if column_name in df.columns]] observations = df[source_columns].copy() observations = ensure_copenhagen_time(observations, "target_timestamp") observations = dedupe_rows(observations, ["target_timestamp"], sort_keys=["target_timestamp"], keep="last") return observations.sort_values("target_timestamp").reset_index(drop=True) def upgrade_training_matrix_with_observation_context(existing_df): """Rebuild observation-context features for the full stored training matrix.""" if existing_df is None or len(existing_df) == 0: return ensure_observation_context_columns(existing_df) upgraded = existing_df.copy() upgraded = ensure_copenhagen_time(upgraded, "target_timestamp") upgraded = ensure_copenhagen_time(upgraded, "reference_time") observations = extract_observations_from_training_matrix(upgraded) observation_context = build_observation_context_frame(observations) upgraded = attach_observation_context(upgraded, observation_context) return dedupe_rows(upgraded, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") def build_training_matrix(forecasts_df, observations_df): """ Build training matrix by merging forecasts with observations. Adds all derived features. """ if forecasts_df is None or observations_df is None: log_event("build_training_matrix missing_inputs") return None # Merge on target_timestamp merged = pd.merge(forecasts_df, observations_df, on='target_timestamp', how='inner') if len(merged) == 0: log_event("build_training_matrix no_matches") return None merged = build_model_features(merged) observation_context = build_observation_context_frame(observations_df) merged = attach_observation_context(merged, observation_context) # Add correction targets for temperature and wind merged['temp_correction_target'] = merged['actual_temp'] - merged['dmi_temperature_2m_pred'] merged['wind_speed_correction_target'] = merged['actual_wind_speed'] - merged['dmi_windspeed_10m_pred'] merged['wind_gust_correction_target'] = merged['actual_wind_gust'] - merged['dmi_windgusts_10m_pred'] # Filter out future times current_hour = now_cph().replace(minute=0, second=0, microsecond=0) merged = merged[merged['target_timestamp'] <= current_hour] merged = dedupe_rows(merged, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") log_event("build_training_matrix done", rows=len(merged), columns=len(merged.columns)) return merged # ============================================================================= # DATASET OPERATIONS # ============================================================================= def upload_to_dataset(local_path, filename, dataset_name, repo_type="dataset"): """Upload a file to Hugging Face dataset.""" log_event("upload_to_dataset", local_path=local_path, filename=filename, dataset_name=dataset_name) try: api = HfApi() api.upload_file( path_or_fileobj=local_path, path_in_repo=filename, repo_id=dataset_name, repo_type=repo_type, token=HF_TOKEN ) log_event("upload_to_dataset done", filename=filename, dataset_name=dataset_name) return True except Exception as e: log_exception("upload_to_dataset", e) return False def load_from_dataset(filename, dataset_name, repo_type="dataset"): """Load a file from Hugging Face dataset.""" log_event("load_from_dataset", filename=filename, dataset_name=dataset_name) try: path = hf_hub_download( repo_id=dataset_name, filename=filename, repo_type=repo_type, token=HF_TOKEN ) log_event("load_from_dataset done", filename=filename, dataset_name=dataset_name, path=path) return path except Exception as e: log_exception("load_from_dataset", e) return None def load_first_available_dataset_file(filenames, dataset_name, repo_type="dataset"): """Return the first dataset file that exists.""" for filename in filenames: path = load_from_dataset(filename, dataset_name, repo_type=repo_type) if path: return path, filename return None, None def init_dataset_if_needed(): """Ensure training_matrix.parquet exists so first runs can start cleanly.""" existing_path, existing_name = load_first_available_dataset_file( ["training_matrix.parquet", "data.parquet"], DATASET_NAME, ) if existing_path: print(f"✅ Training dataset already available as {existing_name}") return existing_path, existing_name empty_df = pd.DataFrame(columns=TRAINING_DEDUP_KEYS) empty_df.to_parquet("training_matrix.parquet") if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME): print("✅ Initialized empty training_matrix.parquet") return "training_matrix.parquet", "training_matrix.parquet" print("⚠️ Could not initialize training_matrix.parquet") return None, None def load_existing_training_matrix(): """Load training matrix using the new filename first and legacy fallback second.""" existing_path, existing_name = load_first_available_dataset_file( ["training_matrix.parquet", "data.parquet"], DATASET_NAME, ) if not existing_path: return None, None existing = pd.read_parquet(existing_path) if existing_name == "data.parquet" and "timestamp" in existing.columns and "target_timestamp" not in existing.columns: existing = existing.rename(columns={"timestamp": "target_timestamp"}) if existing.empty or "target_timestamp" not in existing.columns: return existing, existing_name existing = ensure_copenhagen_time(existing, "target_timestamp") existing = ensure_copenhagen_time(existing, "reference_time") existing = ensure_copenhagen_time(existing, OBSERVATION_CONTEXT_TIMESTAMP_COL) existing = dedupe_rows(existing, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") return existing, existing_name def safe_number(value, digits=3, default=None): """Convert pandas/numpy scalars to JSON-safe floats.""" if value is None: return default try: if pd.isna(value): return default except Exception: pass try: return round(float(value), digits) except Exception: return default def to_iso(value): """Serialize timestamps for the frontend snapshot.""" if value is None: return None try: if pd.isna(value): return None except Exception: pass if hasattr(value, "isoformat"): return value.isoformat() return str(value) def load_json_from_dataset(filename, dataset_name=DATASET_NAME): path = load_from_dataset(filename, dataset_name) if not path: return None try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) except Exception as exc: log_exception(f"load_json_from_dataset[{filename}]", exc) return None def extract_feature_importance_from_bundle(bundle, target_name, top_n=8): """Aggregate feature importance across active bucket models.""" if bundle is None or "models" not in bundle: return [] totals = {} counts = {} for bucket, model_info in bundle.get("models", {}).items(): model = model_info.get("model") feature_cols = model_info.get("feature_columns") or bundle.get("feature_columns", []) importances = getattr(model, "feature_importances_", None) if importances is None or not feature_cols: continue for feature_name, importance in zip(feature_cols, importances): totals[feature_name] = totals.get(feature_name, 0.0) + float(importance) counts[feature_name] = counts.get(feature_name, 0) + 1 rows = [] for feature_name, total in totals.items(): avg_importance = total / max(counts.get(feature_name, 1), 1) rows.append( { "target": target_name, "feature": feature_name, "importance": round(avg_importance, 6), } ) rows.sort(key=lambda item: item["importance"], reverse=True) return rows[:top_n] def build_recent_backtest(training_df): """Recreate recent ML-vs-DMI backtest from the stored training matrix.""" if training_df is None or len(training_df) == 0 or "target_timestamp" not in training_df.columns: return None current_time = now_cph() history = training_df[ (training_df["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS)) & (training_df["target_timestamp"] <= current_time) ].copy() if len(history) == 0: return None if "lead_time_hours" in history.columns: history = history[ history["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both") ].copy() if len(history) == 0: return None history["ml_temp"] = history.get("dmi_temperature_2m_pred", pd.Series(np.nan, index=history.index)) history["ml_wind_speed"] = history.get("dmi_windspeed_10m_pred", pd.Series(np.nan, index=history.index)) history["ml_wind_gust"] = history.get("dmi_windgusts_10m_pred", pd.Series(np.nan, index=history.index)) history["ml_rain_prob"] = ( history.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=history.index)) .fillna(0.0) .clip(0.0, 100.0) / 100.0 ) history["ml_rain_amount"] = ( history.get("dmi_precipitation_pred", pd.Series(0.0, index=history.index)) .fillna(0.0) .clip(0.0, None) ) target_specs = [ ("temperature", "ml_temp", "dmi_temperature_2m_pred", True), ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True), ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True), ("rain_event", "ml_rain_prob", None, False), ("rain_amount", "ml_rain_amount", None, False), ] for target_name, output_col, baseline_col, is_correction in target_specs: bundle = load_model_bundle(target_name) if not bundle: continue target_pred = predict_with_bundle(bundle, history) if target_pred is None: continue prediction_series = pd.Series(target_pred, index=history.index, dtype="float64") prediction_mask = prediction_series.notna() if not prediction_mask.any(): continue if is_correction: history.loc[prediction_mask, output_col] = ( history.loc[prediction_mask, baseline_col] + prediction_series[prediction_mask] ) elif target_name == "rain_event": history.loc[prediction_mask, output_col] = prediction_series[prediction_mask].clip(0.0, 1.0) else: history.loc[prediction_mask, output_col] = prediction_series[prediction_mask].clip(0.0, None) sort_columns = ["target_timestamp"] ascending = [True] if "lead_time_hours" in history.columns: sort_columns.append("lead_time_hours") ascending.append(False) if "reference_time" in history.columns: sort_columns.append("reference_time") ascending.append(False) history = history.sort_values(sort_columns, ascending=ascending) history = history.drop_duplicates(subset=["target_timestamp"], keep="first").reset_index(drop=True) return history def rebuild_future_ml_columns(predictions_df, registry_revision=None): """Recompute live ML columns from stored forecast features before publishing the frontend snapshot.""" if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns: return predictions_df current_time = now_cph() repaired = predictions_df.copy() future_mask = repaired["target_timestamp"] > current_time if not future_mask.any(): return repaired future_df = repaired.loc[future_mask].copy() future_df["ml_temp"] = future_df.get("ml_temp", future_df.get("dmi_temperature_2m_pred")) future_df["ml_wind_speed"] = future_df.get("ml_wind_speed", future_df.get("dmi_windspeed_10m_pred")) future_df["ml_wind_gust"] = future_df.get("ml_wind_gust", future_df.get("dmi_windgusts_10m_pred")) future_df["ml_rain_prob"] = future_df.get( "ml_rain_prob", future_df.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=future_df.index)) .fillna(0.0) .clip(0.0, 100.0) / 100.0, ) future_df["ml_rain_amount"] = future_df.get( "ml_rain_amount", future_df.get("dmi_precipitation_pred", pd.Series(0.0, index=future_df.index)).fillna(0.0).clip(0.0, None), ) target_specs = [ ("temperature", "ml_temp", "dmi_temperature_2m_pred", True), ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True), ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True), ("rain_event", "ml_rain_prob", None, False), ("rain_amount", "ml_rain_amount", None, False), ] for target_name, output_col, baseline_col, is_correction in target_specs: bundle = load_model_bundle(target_name, cache_revision=registry_revision) target_pred = predict_with_bundle(bundle, future_df) if target_pred is None: continue target_series = pd.Series(target_pred, index=future_df.index, dtype="float64") target_mask = target_series.notna() if not target_mask.any(): continue if is_correction: future_df.loc[target_mask, output_col] = future_df.loc[target_mask, baseline_col] + target_series[target_mask] elif target_name == "rain_event": future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, 1.0) else: future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, None) future_df["ml_rain_prob"] = future_df["ml_rain_prob"].fillna(0.0).clip(0.0, 1.0) future_df["ml_rain_amount"] = future_df["ml_rain_amount"].fillna(0.0).clip(0.0, None) for column_name in future_df.columns: if column_name not in repaired.columns: repaired[column_name] = np.nan repaired.loc[future_mask, future_df.columns] = future_df return repaired def build_recent_verified_history(predictions_df): """Return verified prediction rows from the last 7 days.""" if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns: return None current_time = now_cph() verified = predictions_df[predictions_df["verified"].fillna(False).astype(bool)].copy() if len(verified) == 0: return None verified = verified[ (verified["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS)) & (verified["target_timestamp"] <= current_time) ].copy() if len(verified) == 0: return None if "lead_time_hours" in verified.columns: verified = verified[ verified["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both") ].copy() if len(verified) == 0: return None return verified.sort_values("target_timestamp").reset_index(drop=True) def merge_recent_history_sources(backtest_df=None, predictions_df=None): """Combine 7-day backtest with newer verified predictions, preferring verified rows.""" backtest = backtest_df.copy() if backtest_df is not None and len(backtest_df) > 0 else None verified = build_recent_verified_history(predictions_df) if backtest is None and verified is None: return None if backtest is None: return verified if verified is None: return backtest.sort_values("target_timestamp").reset_index(drop=True) backtest = backtest.copy() verified = verified.copy() backtest["_history_priority"] = 0 verified["_history_priority"] = 1 merged = pd.concat([backtest, verified], ignore_index=True, sort=False) sort_columns = ["target_timestamp", "_history_priority"] ascending = [True, True] if "lead_time_hours" in merged.columns: sort_columns.append("lead_time_hours") ascending.append(False) if "reference_time" in merged.columns: sort_columns.append("reference_time") ascending.append(False) merged = merged.sort_values(sort_columns, ascending=ascending) merged = merged.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True) if "_history_priority" in merged.columns: merged = merged.drop(columns=["_history_priority"]) return merged def calculate_verification_metrics(predictions_df=None, backtest_df=None): """Compute frontend-facing verification summary.""" source_df = merge_recent_history_sources(backtest_df=backtest_df, predictions_df=predictions_df) period_label = "Ingen verificeret historik endnu" if source_df is not None and len(source_df) > 0: verified = build_recent_verified_history(predictions_df) if verified is not None and len(verified) > 0: period_label = "Seneste 7 dages backtest og verificerede predictioner" else: period_label = "Seneste 7 dages backtest" result = { "target": "temperature", "periodLabel": period_label, "rmseDmi": None, "rmseMl": None, "maeDmi": None, "maeMl": None, "winRate": None, "totalPredictions": 0 if source_df is None else int(len(source_df)), } if source_df is None or len(source_df) == 0: return result if {"actual_temp", "dmi_temperature_2m_pred", "ml_temp"}.issubset(source_df.columns): valid = source_df.dropna(subset=["actual_temp", "dmi_temperature_2m_pred", "ml_temp"]).copy() if len(valid) > 0: dmi_error = valid["actual_temp"] - valid["dmi_temperature_2m_pred"] ml_error = valid["actual_temp"] - valid["ml_temp"] result["rmseDmi"] = safe_number(np.sqrt(np.mean(dmi_error**2)), digits=3) result["rmseMl"] = safe_number(np.sqrt(np.mean(ml_error**2)), digits=3) result["maeDmi"] = safe_number(np.mean(np.abs(dmi_error)), digits=3) result["maeMl"] = safe_number(np.mean(np.abs(ml_error)), digits=3) win_rate = float((np.abs(ml_error) < np.abs(dmi_error)).mean() * 100) result["winRate"] = round(win_rate, 1) result["totalPredictions"] = int(len(valid)) return result def build_lead_bucket_rows(registry): """Normalize registry summary rows for the frontend.""" if not registry: return [] label_map = { "1-6": "1-6 timer", "7-12": "7-12 timer", "13-24": "13-24 timer", "25-48": "25-48 timer", } rows = [] for row in registry.get("summary_rows", []): baseline = safe_number(row.get("baseline_metric"), digits=6) ml_metric = safe_number(row.get("ml_metric"), digits=6) improvement = None if baseline not in (None, 0) and ml_metric is not None: improvement = round(((baseline - ml_metric) / baseline) * 100, 2) rows.append( { "bucket": row.get("lead_bucket"), "label": label_map.get(row.get("lead_bucket"), row.get("lead_bucket")), "baselineMetric": baseline, "mlMetric": ml_metric, "improvementPct": improvement, "target": row.get("target"), } ) return rows TARGET_LABELS = { "temperature": "Temperatur", "wind_speed": "Vindhastighed", "wind_gust": "Vindstød", "rain_event": "Regnrisiko", "rain_amount": "Regnmængde", } LEAD_BUCKET_ORDER = ["1-6", "7-12", "13-24", "25-48"] def build_target_labels(): return dict(TARGET_LABELS) def build_explanations(): return { "forecast": "Du ser DMI's prognose side om side med vores ML-justering, når der er en aktiv model.", "performance": "Her kan du sammenligne, hvad DMI sagde, hvad ML sagde, og hvad vejret faktisk endte med at blive.", "sources": "DMI er grundprognosen. ML er vores lokale justering. Hvis en ML-model ikke er aktiv, viser vi DMI direkte.", } def build_target_status(registry): registry_targets = registry.get("targets", {}) if registry else {} target_status = {} for target_name in MODEL_FILES: active_buckets = registry_targets.get(target_name, {}).get("active_buckets") or [] ordered_buckets = [bucket for bucket in LEAD_BUCKET_ORDER if bucket in active_buckets] has_active_model = len(ordered_buckets) > 0 target_label = TARGET_LABELS.get(target_name, target_name) if has_active_model: status_label = "ML aktiv" status_description = ( f"Vi viser baade DMI og ML for {target_label.lower()}, og ML bruges som den aktive prognose, når den findes." ) else: status_label = "Vises som DMI-prognose" status_description = ( f"Vi viser DMI direkte for {target_label.lower()}, fordi der ikke er en aktiv ML-model for dette signal endnu." ) target_status[target_name] = { "hasActiveModel": has_active_model, "activeBuckets": ordered_buckets, "statusLabel": status_label, "statusDescription": status_description, } return target_status def choose_effective_value(ml_value, dmi_value, has_active_model): if has_active_model and ml_value is not None: return ml_value, "ml" if dmi_value is not None: return dmi_value, "dmi" if ml_value is not None: return ml_value, "ml" return None, "dmi" def build_history_payload(predictions_df=None, backtest_df=None): source_df = merge_recent_history_sources(backtest_df=backtest_df, predictions_df=predictions_df) history = { "temperature": [], "wind": [], "rain": [], } if source_df is None or len(source_df) == 0: return history for _, row in source_df.iterrows(): history["temperature"].append( { "timestamp": to_iso(row.get("target_timestamp")), "dmiTemp": safe_number(row.get("dmi_temperature_2m_pred")), "mlTemp": safe_number(row.get("ml_temp")), "actual": safe_number(row.get("actual_temp")), "actualTemp": safe_number(row.get("actual_temp")), "verified": True, } ) history["wind"].append( { "timestamp": to_iso(row.get("target_timestamp")), "dmiWindSpeed": safe_number(row.get("dmi_windspeed_10m_pred")), "mlWindSpeed": safe_number(row.get("ml_wind_speed")), "actualWindSpeed": safe_number(row.get("actual_wind_speed")), "dmiWindGust": safe_number(row.get("dmi_windgusts_10m_pred")), "mlWindGust": safe_number(row.get("ml_wind_gust")), "actualWindGust": safe_number(row.get("actual_wind_gust")), "verified": True, } ) dmi_rain_prob = safe_number(row.get("dmi_precipitation_probability_pred"), digits=2, default=0.0) or 0.0 ml_rain_prob = round((safe_number(row.get("ml_rain_prob"), digits=4, default=0.0) or 0.0) * 100, 2) actual_rain_amount = safe_number( row.get("actual_rain_amount", row.get("actual_precipitation")), digits=3, default=None, ) if actual_rain_amount is None: actual_rain_amount = safe_number(row.get("actual_precipitation"), digits=3, default=None) actual_rain_event = row.get("actual_rain_event") if actual_rain_event is None or pd.isna(actual_rain_event): if actual_rain_amount is None: actual_rain_event = None else: actual_rain_event = 1 if actual_rain_amount > 0.1 else 0 else: actual_rain_event = int(actual_rain_event) history["rain"].append( { "timestamp": to_iso(row.get("target_timestamp")), "dmiRainProb": dmi_rain_prob, "mlRainProb": ml_rain_prob, "actualRainEvent": actual_rain_event, "dmiRainAmount": safe_number(row.get("dmi_precipitation_pred"), digits=3, default=0.0) or 0.0, "mlRainAmount": safe_number(row.get("ml_rain_amount"), digits=3, default=0.0) or 0.0, "actualRainAmount": actual_rain_amount, "verified": True, } ) return history def build_forecast_row(row, target_status): dmi_temp = safe_number(row.get("dmi_temperature_2m_pred")) ml_temp = safe_number(row.get("ml_temp")) dmi_wind_speed = safe_number(row.get("dmi_windspeed_10m_pred")) ml_wind_speed = safe_number(row.get("ml_wind_speed")) dmi_wind_gust = safe_number(row.get("dmi_windgusts_10m_pred")) ml_wind_gust = safe_number(row.get("ml_wind_gust")) dmi_rain_prob = safe_number(row.get("dmi_precipitation_probability_pred"), digits=2, default=0.0) or 0.0 ml_rain_prob = round((safe_number(row.get("ml_rain_prob"), digits=4, default=0.0) or 0.0) * 100, 2) dmi_rain_amount = safe_number(row.get("dmi_precipitation_pred"), digits=3, default=0.0) or 0.0 ml_rain_amount = safe_number(row.get("ml_rain_amount"), digits=3, default=0.0) or 0.0 effective_temp, effective_temp_source = choose_effective_value( ml_temp, dmi_temp, target_status["temperature"]["hasActiveModel"], ) effective_wind_speed, effective_wind_speed_source = choose_effective_value( ml_wind_speed, dmi_wind_speed, target_status["wind_speed"]["hasActiveModel"], ) effective_wind_gust, effective_wind_gust_source = choose_effective_value( ml_wind_gust, dmi_wind_gust, target_status["wind_gust"]["hasActiveModel"], ) effective_rain_prob, effective_rain_prob_source = choose_effective_value( ml_rain_prob, dmi_rain_prob, target_status["rain_event"]["hasActiveModel"], ) effective_rain_amount, effective_rain_amount_source = choose_effective_value( ml_rain_amount, dmi_rain_amount, target_status["rain_amount"]["hasActiveModel"], ) timestamp = to_iso(row.get("target_timestamp")) return { "timestamp": timestamp, "hour": timestamp, "leadTimeHours": int(row.get("lead_time_hours") or 0), "dmiTemp": dmi_temp, "mlTemp": ml_temp, "effectiveTemp": effective_temp, "effectiveTempSource": effective_temp_source, "apparentTemp": safe_number(row.get("dmi_apparent_temperature_pred")), "dmiWindSpeed": dmi_wind_speed, "mlWindSpeed": ml_wind_speed, "effectiveWindSpeed": effective_wind_speed, "effectiveWindSpeedSource": effective_wind_speed_source, "dmiWindGust": dmi_wind_gust, "mlWindGust": ml_wind_gust, "effectiveWindGust": effective_wind_gust, "effectiveWindGustSource": effective_wind_gust_source, "windDirection": safe_number(row.get("dmi_winddirection_10m_pred")), "dmiRainProb": dmi_rain_prob, "mlRainProb": ml_rain_prob, "effectiveRainProb": effective_rain_prob or 0.0, "effectiveRainProbSource": effective_rain_prob_source, "dmiRainAmount": dmi_rain_amount, "mlRainAmount": ml_rain_amount, "effectiveRainAmount": effective_rain_amount or 0.0, "effectiveRainAmountSource": effective_rain_amount_source, "weatherCode": int(row.get("dmi_weather_code_pred")) if safe_number(row.get("dmi_weather_code_pred"), digits=0) is not None else None, "cloudCover": safe_number(row.get("dmi_cloud_cover_pred")), "humidity": safe_number(row.get("dmi_relative_humidity_2m_pred")), "pressure": safe_number(row.get("dmi_pressure_msl_pred")), } def build_current_payload(current_row): if not current_row: return { "timestamp": to_iso(now_cph()), "temp": None, "dmiTemp": None, "mlTemp": None, "tempSource": "dmi", "apparentTemp": None, "windSpeed": None, "dmiWindSpeed": None, "mlWindSpeed": None, "windSpeedSource": "dmi", "windGust": None, "dmiWindGust": None, "mlWindGust": None, "windGustSource": "dmi", "windDirection": None, "rainProb": 0.0, "dmiRainProb": 0.0, "mlRainProb": 0.0, "rainProbSource": "dmi", "rainAmount": 0.0, "dmiRainAmount": 0.0, "mlRainAmount": 0.0, "rainAmountSource": "dmi", "humidity": None, "pressure": None, "cloudCover": None, "weatherCode": None, } return { "timestamp": current_row["timestamp"], "temp": current_row["effectiveTemp"], "dmiTemp": current_row["dmiTemp"], "mlTemp": current_row["mlTemp"], "tempSource": current_row["effectiveTempSource"], "apparentTemp": current_row["apparentTemp"], "windSpeed": current_row["effectiveWindSpeed"], "dmiWindSpeed": current_row["dmiWindSpeed"], "mlWindSpeed": current_row["mlWindSpeed"], "windSpeedSource": current_row["effectiveWindSpeedSource"], "windGust": current_row["effectiveWindGust"], "dmiWindGust": current_row["dmiWindGust"], "mlWindGust": current_row["mlWindGust"], "windGustSource": current_row["effectiveWindGustSource"], "windDirection": current_row["windDirection"], "rainProb": current_row["effectiveRainProb"], "dmiRainProb": current_row["dmiRainProb"], "mlRainProb": current_row["mlRainProb"], "rainProbSource": current_row["effectiveRainProbSource"], "rainAmount": current_row["effectiveRainAmount"], "dmiRainAmount": current_row["dmiRainAmount"], "mlRainAmount": current_row["mlRainAmount"], "rainAmountSource": current_row["effectiveRainAmountSource"], "humidity": current_row["humidity"], "pressure": current_row["pressure"], "cloudCover": current_row["cloudCover"], "weatherCode": current_row["weatherCode"], } def build_alert_rows(forecast_rows): """Derive lightweight alert messages from the live forecast.""" alerts = [] if not forecast_rows: return [ { "type": "data", "severity": "warning", "title": "Ingen forecast-data", "message": "Collector kunne ikke bygge et forecast-snapshot.", } ] max_wind_row = max( forecast_rows, key=lambda row: max(row.get("effectiveWindSpeed") or 0.0, row.get("effectiveWindGust") or 0.0), ) max_rain_row = max( forecast_rows, key=lambda row: max(row.get("effectiveRainProb") or 0.0, row.get("effectiveRainAmount") or 0.0), ) max_wind_speed = max_wind_row.get("effectiveWindSpeed") or 0.0 max_wind_gust = max_wind_row.get("effectiveWindGust") or 0.0 max_rain_prob = max_rain_row.get("effectiveRainProb") or 0.0 max_rain_amount = max_rain_row.get("effectiveRainAmount") or 0.0 wind_source = ( "ML" if max_wind_row.get("effectiveWindSpeedSource") == "ml" or max_wind_row.get("effectiveWindGustSource") == "ml" else "DMI" ) rain_source = ( "ML" if max_rain_row.get("effectiveRainProbSource") == "ml" or max_rain_row.get("effectiveRainAmountSource") == "ml" else "DMI" ) if max_wind_speed >= 15 or max_wind_gust >= 20: alerts.append( { "type": "wind", "severity": "warning", "title": "Kraftig vind", "message": f"{wind_source} forventer op til {max_wind_speed:.1f} m/s og vindstød op til {max_wind_gust:.1f} m/s i forecast-vinduet.", } ) if max_rain_prob >= 70 or max_rain_amount >= 5: alerts.append( { "type": "rain", "severity": "warning", "title": "Våd periode", "message": f"{rain_source} forventer op til {max_rain_prob:.0f}% regnrisiko og {max_rain_amount:.1f} mm/time i forecast-vinduet.", } ) if not alerts: alerts.append( { "type": "data", "severity": "info", "title": "Roligt forecast", "message": "Snapshot viser ingen kraftige regn- eller vindsignaler lige nu.", } ) return alerts def build_frontend_snapshot(): """Build the JSON contract consumed by the Vercel frontend.""" predictions_df = load_predictions_snapshot() training_df, _ = load_existing_training_matrix() registry = load_json_from_dataset("model_registry.json", DATASET_NAME) or {} model_meta = load_json_from_dataset("model_meta.json", DATASET_NAME) or {} registry_revision = registry.get("generated_at") if registry_revision: clear_model_bundle_cache(registry_revision) predictions_df = rebuild_future_ml_columns(predictions_df, registry_revision=registry_revision) target_status = build_target_status(registry) backtest_df = build_recent_backtest(training_df) verification = calculate_verification_metrics(predictions_df=predictions_df, backtest_df=backtest_df) history = build_history_payload(predictions_df=predictions_df, backtest_df=backtest_df) future_df = None if predictions_df is not None and len(predictions_df) > 0: future_df = predictions_df[predictions_df["target_timestamp"] > now_cph()].copy() future_df = future_df.sort_values("target_timestamp").head(48).reset_index(drop=True) forecast_rows = [] if future_df is not None and len(future_df) > 0: for _, row in future_df.iterrows(): forecast_rows.append(build_forecast_row(row, target_status)) current_row = forecast_rows[0] if forecast_rows else None current = build_current_payload(current_row) feature_importance = [] for target_name in MODEL_FILES: feature_importance.extend( extract_feature_importance_from_bundle( load_model_bundle(target_name, cache_revision=registry_revision), target_name, ) ) feature_importance.sort(key=lambda item: item["importance"], reverse=True) snapshot = { "location": { "name": "Aarhus", "timezone": "Europe/Copenhagen", }, "generatedAt": to_iso(now_cph()), "targetLabels": build_target_labels(), "explanations": build_explanations(), "targetStatus": target_status, "current": current, "forecast": forecast_rows, "history": history, "verification": verification, "leadBuckets": build_lead_bucket_rows(registry), "featureImportance": feature_importance[:24], "modelInfo": { "trainedAt": model_meta.get("trained_at"), "trainingSamples": model_meta.get("n_samples"), "targets": model_meta.get("targets", list(MODEL_FILES.keys())), "registryGeneratedAt": registry.get("generated_at"), }, "alerts": build_alert_rows(forecast_rows), } return snapshot def publish_frontend_snapshot(): """Write and upload the frontend snapshot JSON to the predictions dataset.""" try: snapshot = build_frontend_snapshot() with open(FRONTEND_SNAPSHOT_FILE, "w", encoding="utf-8") as handle: json.dump(snapshot, handle, indent=2, ensure_ascii=False) success = upload_to_dataset(FRONTEND_SNAPSHOT_FILE, FRONTEND_SNAPSHOT_FILE, PREDICTIONS_DATASET) if success: return True, f"Uploaded {FRONTEND_SNAPSHOT_FILE}" return False, f"Failed to upload {FRONTEND_SNAPSHOT_FILE}" except Exception as exc: log_exception("publish_frontend_snapshot", exc) return False, str(exc) # ============================================================================= # BACKFILL OPERATIONS # ============================================================================= def backfill_historical_data(): """Backfill historical data from the agreed historical start date to now.""" log_event("backfill_historical_data entered") init_dataset_if_needed() start_date = HISTORICAL_BACKFILL_START end_date = now_cph().date() print(f"🔄 Fetching from {start_date} to {end_date}") all_data = [] current_month_start = start_date while current_month_start <= end_date: if current_month_start.month == 12: next_month = datetime(current_month_start.year + 1, 1, 1).date() else: next_month = datetime(current_month_start.year, current_month_start.month + 1, 1).date() month_end = min(next_month - timedelta(days=1), end_date) print(f"🔄 Fetching {current_month_start.strftime('%Y-%m')}...") forecasts = fetch_forecasts_for_period(current_month_start, month_end) if forecasts is not None and len(forecasts) > 0: min_target = forecasts['target_timestamp'].min().date() max_target = forecasts['target_timestamp'].max().date() observations = fetch_observations_for_period( min_target - timedelta(days=2), max_target + timedelta(days=2) ) if observations is not None: merged = build_training_matrix(forecasts, observations) if merged is not None and len(merged) > 0: all_data.append(merged) print(f"✅ {len(merged)} rows") current_month_start = next_month if not all_data: return "❌ No data collected" final_df = pd.concat(all_data, ignore_index=True) final_df = dedupe_rows(final_df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") # Save to parquet final_df.to_parquet("training_matrix.parquet") # Upload if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME): return f"✅ Backfilled {len(final_df)} rows to training_matrix.parquet" else: return "❌ Failed to upload" # ============================================================================= # DAILY UPDATE # ============================================================================= def update_daily(): """Daily update - fetch last 7 days and append to dataset.""" log_event("update_daily entered") init_dataset_if_needed() end_date = now_cph().date() start_date = end_date - timedelta(days=7) print(f"⏰ Copenhagen time: {now_cph()}") forecasts = fetch_forecasts_for_period(start_date, end_date) if forecasts is None: return "❌ No forecasts fetched" min_target = forecasts['target_timestamp'].min().date() max_target = forecasts['target_timestamp'].max().date() observations = fetch_observations_for_period(min_target - timedelta(days=2), max_target) if observations is None: return "❌ No observations fetched" merged = build_training_matrix(forecasts, observations) if merged is None or len(merged) == 0: return "❌ No matching data" # Try to load existing data try: existing, existing_name = load_existing_training_matrix() if existing is not None and not existing.empty and "target_timestamp" in existing.columns: new_data = find_new_rows(merged, existing, TRAINING_DEDUP_KEYS) if len(new_data) == 0: return f"ℹ️ No new data ({existing_name} already up to date)" combined = pd.concat([existing, new_data], ignore_index=True) combined = dedupe_rows(combined, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last") status_msg = f"✅ Added {len(new_data)} new rows" else: combined = merged status_msg = f"✅ Created new dataset with {len(merged)} rows" except Exception as e: print(f"Info: {e}") combined = merged status_msg = f"✅ Created dataset with {len(merged)} rows" combined.to_parquet("training_matrix.parquet") if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME): snapshot_ok, snapshot_msg = publish_frontend_snapshot() if snapshot_ok: return f"{status_msg}\nSnapshot: {snapshot_msg}" return f"{status_msg}\nSnapshot warning: {snapshot_msg}" else: return "❌ Failed to upload" # ============================================================================= # PREDICTIONS # ============================================================================= def load_model_bundle(target_name, cache_revision=None): """Load model bundle from dataset, caching by registry revision.""" log_event("load_model_bundle", target_name=target_name, cache_revision=cache_revision) with APP_STATE.lock: if cache_revision and APP_STATE.cache_revision != cache_revision: APP_STATE.model_bundle_cache = {} APP_STATE.cache_revision = cache_revision cached_bundle = APP_STATE.model_bundle_cache.get(target_name) if cached_bundle is not None: return cached_bundle try: path = hf_hub_download( repo_id=DATASET_NAME, filename=f"{target_name}_models.pkl", repo_type="dataset", token=HF_TOKEN ) bundle = joblib.load(path) with APP_STATE.lock: APP_STATE.model_bundle_cache[target_name] = bundle return bundle except Exception as e: log_exception(f"load_model_bundle[{target_name}]", e) return None def predict_with_bundle(bundle, df): """Make predictions using model bundle.""" if bundle is None or 'models' not in bundle: return None predictions = np.full(len(df), np.nan) for bucket in df['lead_bucket'].unique(): if bucket in bundle['models']: bucket_mask = df['lead_bucket'] == bucket bucket_df = df[bucket_mask] model_info = bundle['models'][bucket] model = model_info['model'] feature_cols = model_info.get('feature_columns', []) if feature_cols: missing_cols = [col for col in feature_cols if col not in bucket_df.columns] if missing_cols: log_event("predict_with_bundle missing_features", bucket=bucket, missing_columns=missing_cols) continue X = bucket_df[feature_cols].fillna(0.0) if hasattr(model, "predict_proba"): bucket_pred = model.predict_proba(X)[:, 1] else: bucket_pred = model.predict(X) predictions[bucket_mask] = bucket_pred return predictions # ============================================================================= # STARTUP CATCH-UP # ============================================================================= def generate_predictions(): """Generate predictions for all targets and preserve verified history.""" log_event("generate_predictions entered") current_time = now_cph() log_event("generate_predictions clock", current_time=str(current_time)) future_forecasts = fetch_future_forecasts() if future_forecasts is None or len(future_forecasts) == 0: return "Could not fetch future forecasts" registry_revision = None try: registry_path = hf_hub_download( repo_id=DATASET_NAME, filename="model_registry.json", repo_type="dataset", token=HF_TOKEN, ) with open(registry_path, "r") as handle: registry = json.load(handle) registry_revision = registry.get("generated_at") if registry_revision: clear_model_bundle_cache(registry_revision) except Exception as exc: log_exception("generate_predictions model_registry", exc) feature_frame = build_live_feature_frame(future_forecasts) results = feature_frame.copy() results["prediction_made_at"] = current_time results["city"] = "aarhus" results["verified"] = False results["actual_temp"] = None results["actual_wind_speed"] = None results["actual_wind_gust"] = None results["actual_precipitation"] = None results["actual_rain"] = None results["actual_rain_event"] = None results["actual_rain_amount"] = None results["ml_temp"] = results["dmi_temperature_2m_pred"] results["ml_wind_speed"] = results["dmi_windspeed_10m_pred"] results["ml_wind_gust"] = results["dmi_windgusts_10m_pred"] results["ml_rain_prob"] = results["dmi_precipitation_probability_pred"].fillna(0.0).clip(0.0, 100.0) / 100.0 results["ml_rain_amount"] = results["dmi_precipitation_pred"].fillna(0.0).clip(0.0, None) target_specs = [ ("temperature", "ml_temp", "dmi_temperature_2m_pred", True), ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True), ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True), ("rain_event", "ml_rain_prob", None, False), ("rain_amount", "ml_rain_amount", None, False), ] for target_name, output_col, baseline_col, is_correction in target_specs: bundle = load_model_bundle(target_name, cache_revision=registry_revision) target_pred = predict_with_bundle(bundle, feature_frame) if target_pred is None: continue target_series = pd.Series(target_pred, index=results.index, dtype="float64") target_mask = target_series.notna() if not target_mask.any(): continue if is_correction: results.loc[target_mask, output_col] = results.loc[target_mask, baseline_col] + target_series[target_mask] else: results.loc[target_mask, output_col] = target_series[target_mask] results["ml_rain_prob"] = results["ml_rain_prob"].clip(0.0, 1.0) results["ml_rain_amount"] = results["ml_rain_amount"].clip(0.0, None) results = merge_prediction_history(load_predictions_snapshot(), results) results.to_parquet("predictions_latest.parquet") if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET): future_count = int((results["target_timestamp"] > current_time).sum()) verified_count = int(results["verified"].fillna(False).astype(bool).sum()) status_msg = ( f"Generated/upserted {len(feature_frame)} future predictions. " f"Dataset now holds {len(results)} rows, including {future_count} future rows " f"and {verified_count} verified rows." ) snapshot_ok, snapshot_msg = publish_frontend_snapshot() if snapshot_ok: return f"{status_msg}\nSnapshot: {snapshot_msg}" return f"{status_msg}\nSnapshot warning: {snapshot_msg}" return "Failed to upload predictions" def verify_predictions(): """Verify past predictions with actual observations.""" log_event("verify_predictions entered") try: pred_df = load_predictions_snapshot() if pred_df is None or len(pred_df) == 0: return "No predictions file found" now = now_cph() to_verify = pred_df[ (~pred_df["verified"]) & (pred_df["target_timestamp"] < now - timedelta(hours=1)) ] if len(to_verify) == 0: return "No predictions to verify" start_date = to_verify["target_timestamp"].min().date() end_date = to_verify["target_timestamp"].max().date() observations = fetch_observations_for_period(start_date, end_date) if observations is None or len(observations) == 0: return "Could not fetch observations" observation_cols = [ "actual_temp", "actual_wind_speed", "actual_wind_gust", "actual_precipitation", "actual_rain", "rain_event", "rain_amount", ] lookup = observations.set_index("target_timestamp")[observation_cols] verified_count = 0 for idx, row in to_verify.iterrows(): target_timestamp = row["target_timestamp"] if target_timestamp not in lookup.index: continue match = lookup.loc[target_timestamp] pred_df.loc[idx, "actual_temp"] = match["actual_temp"] pred_df.loc[idx, "actual_wind_speed"] = match["actual_wind_speed"] pred_df.loc[idx, "actual_wind_gust"] = match["actual_wind_gust"] pred_df.loc[idx, "actual_precipitation"] = match["actual_precipitation"] pred_df.loc[idx, "actual_rain"] = match["actual_rain"] pred_df.loc[idx, "actual_rain_event"] = match["rain_event"] pred_df.loc[idx, "actual_rain_amount"] = match["rain_amount"] pred_df.loc[idx, "verified"] = True verified_count += 1 pred_df = normalize_prediction_df(pred_df) pred_df.to_parquet("predictions_latest.parquet") if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET): status_msg = f"Verified {verified_count} predictions" snapshot_ok, snapshot_msg = publish_frontend_snapshot() if snapshot_ok: return f"{status_msg}\nSnapshot: {snapshot_msg}" return f"{status_msg}\nSnapshot warning: {snapshot_msg}" return "Failed to upload verified predictions" except Exception as exc: log_exception("verify_predictions", exc) return f"Verification error: {exc}" def load_predictions_snapshot(): pred_path, _ = load_first_available_dataset_file( ["predictions_latest.parquet", "predictions.parquet"], PREDICTIONS_DATASET, ) if not pred_path: return None pred_df = normalize_prediction_df(pd.read_parquet(pred_path)) if pred_df is None or 'target_timestamp' not in pred_df.columns: return None return pred_df def build_collector_snapshot_text(): """Summarize stored training and prediction data for the collector landing view.""" lines = [] try: training_df, training_name = load_existing_training_matrix() if training_df is None or len(training_df) == 0: lines.append("Training data: no rows available.") else: latest_training = training_df["target_timestamp"].max() lines.append(f"Training data ({training_name}): {len(training_df)} rows through {latest_training}.") if "lead_bucket" in training_df.columns: bucket_counts = training_df["lead_bucket"].value_counts().sort_index().to_dict() bucket_text = ", ".join(f"{bucket}={count}" for bucket, count in bucket_counts.items()) lines.append(f"Lead buckets: {bucket_text}") except Exception as exc: lines.append(f"Training data summary unavailable: {exc}") try: pred_df = load_predictions_snapshot() if pred_df is None or len(pred_df) == 0: lines.append("Predictions: no rows available.") else: now = now_cph() future_count = int((pred_df["target_timestamp"] > now).sum()) verified_count = int(pred_df["verified"].fillna(False).astype(bool).sum()) latest_prediction = pred_df["prediction_made_at"].max() if "prediction_made_at" in pred_df.columns else "unknown" lines.append(f"Predictions: {len(pred_df)} rows, {future_count} future, {verified_count} verified.") lines.append(f"Latest prediction made at: {latest_prediction}") except Exception as exc: lines.append(f"Prediction summary unavailable: {exc}") return "\n".join(lines) def build_collector_load_outputs(): return build_app_status_text(), build_collector_snapshot_text() def build_action_status(result): return f"{result}\n\n{build_collector_snapshot_text()}" def should_run_daily_catch_up(): matrix_path = load_from_dataset("training_matrix.parquet", DATASET_NAME) if not matrix_path: matrix_path = load_from_dataset("data.parquet", DATASET_NAME) if not matrix_path: return True df = pd.read_parquet(matrix_path) timestamp_col = "target_timestamp" if "target_timestamp" in df.columns else "timestamp" if "timestamp" in df.columns else None if timestamp_col is None: return True latest_timestamp = pd.to_datetime(df[timestamp_col], errors="coerce").max() if pd.isna(latest_timestamp): return True return latest_timestamp.date() < now_cph().date() def should_generate_predictions(): pred_df = load_predictions_snapshot() if pred_df is None or pred_df.empty: return True future = pred_df[pred_df["target_timestamp"] > now_cph()] if future.empty: return True return future["target_timestamp"].max() < now_cph() + timedelta(hours=6) def should_verify_predictions(): pred_df = load_predictions_snapshot() if pred_df is None or pred_df.empty: return False if 'verified' not in pred_df.columns: pred_df['verified'] = False overdue = pred_df[ (~pred_df['verified']) & (pred_df['target_timestamp'] < now_cph() - timedelta(hours=1)) ] return len(overdue) > 0 def run_post_start_catch_up(): time.sleep(20) with APP_STATE.lock: if APP_STATE.catch_up_started: return APP_STATE.catch_up_started = True APP_STATE.active_job = True actions = [] try: if should_run_daily_catch_up(): actions.append(run_logged("startup_catch_up_daily", update_daily)) if should_generate_predictions(): actions.append(run_logged("startup_catch_up_generate_predictions", generate_predictions)) if should_verify_predictions(): actions.append(run_logged("startup_catch_up_verify_predictions", verify_predictions)) log_event("startup_catch_up_completed", action_count=len(actions)) except Exception as exc: note_app_error(exc) log_exception("run_post_start_catch_up", exc) finally: with APP_STATE.lock: APP_STATE.active_job = False APP_STATE.warming = False APP_STATE.ready = True APP_STATE.catch_up_completed = True # ============================================================================= # SCHEDULER # ============================================================================= def run_scheduler(): """Background scheduler for automated tasks.""" log_event("scheduler starting") for run_time in ["00:35", "03:35", "06:35", "09:35", "12:35", "15:35", "18:35", "21:35"]: schedule.every().day.at(run_time).do(lambda: run_logged("scheduled_generate_predictions", generate_predictions)) schedule.every().hour.at(":12").do(lambda: run_logged("scheduled_verify_predictions", verify_predictions)) schedule.every().day.at("05:45").do(lambda: run_logged("scheduled_update_daily", update_daily)) log_event("scheduler_registered") while True: try: schedule.run_pending() except Exception as exc: log_exception("scheduler.run_pending", exc) time.sleep(60) # ============================================================================= # GRADIO UI # ============================================================================= log_event("building_gradio_ui") with gr.Blocks(title="DMI Aarhus Collector") as demo: gr.Markdown(""" # 🌤️ DMI Aarhus Weather Collector Collects forecast and observation data for Aarhus, generates predictions, and verifies accuracy. """) app_status = gr.Markdown(build_app_status_text()) status = gr.Textbox(label="Status", lines=10, value="Loading collector snapshot...") with gr.Row(): btn_backfill = gr.Button("🚀 Backfill Historical Data", variant="primary") btn_daily = gr.Button("🔄 Daily Update", variant="secondary") btn_predict = gr.Button("🔮 Generate Predictions", variant="primary") btn_verify = gr.Button("✅ Verify Predictions", variant="secondary") def backfill_handler(): try: result = run_logged("gradio_backfill_historical_data", backfill_historical_data) set_app_ready() return build_app_status_text(), build_action_status(result) except Exception as exc: note_app_error(exc) return build_app_status_text(), f"❌ backfill_historical_data failed: {exc}" def daily_handler(): try: result = run_logged("gradio_update_daily", update_daily) set_app_ready() return build_app_status_text(), build_action_status(result) except Exception as exc: note_app_error(exc) return build_app_status_text(), f"❌ update_daily failed: {exc}" def predict_handler(): try: result = run_logged("gradio_generate_predictions", generate_predictions) set_app_ready() return build_app_status_text(), build_action_status(result) except Exception as exc: note_app_error(exc) return build_app_status_text(), f"❌ generate_predictions failed: {exc}" def verify_handler(): try: result = run_logged("gradio_verify_predictions", verify_predictions) set_app_ready() return build_app_status_text(), build_action_status(result) except Exception as exc: note_app_error(exc) return build_app_status_text(), f"❌ verify_predictions failed: {exc}" btn_backfill.click(backfill_handler, outputs=[app_status, status]) btn_daily.click(daily_handler, outputs=[app_status, status]) btn_predict.click(predict_handler, outputs=[app_status, status]) btn_verify.click(verify_handler, outputs=[app_status, status]) demo.load(build_collector_load_outputs, outputs=[app_status, status]) log_event("gradio_ui_ready") log_event("ui_constructed") if __name__ == "__main__": log_startup() try: set_app_ready() scheduler_thread = threading.Thread(target=run_scheduler, daemon=True, name="collector-scheduler") scheduler_thread.start() catch_up_thread = threading.Thread(target=run_post_start_catch_up, daemon=True, name="collector-startup-catch-up") catch_up_thread.start() log_event("scheduler_thread_started", thread_name=scheduler_thread.name, is_alive=scheduler_thread.is_alive()) log_event("catch_up_thread_started", thread_name=catch_up_thread.name, is_alive=catch_up_thread.is_alive()) log_event("gradio_launch_called", server_name="0.0.0.0", server_port=7860, ssr_mode=False) demo.launch( server_name="0.0.0.0", server_port=7860, ssr_mode=False ) except Exception as exc: log_exception("__main__", exc) raise