dmi-collector / app.py
Ciroc0's picture
Update collector docs and runtime text
ad80a9b
Raw
History Blame Contribute Delete
91.5 kB
import importlib
import json
import platform
import sys
import traceback
import faulthandler
from dataclasses import dataclass, field
from datetime import datetime, timedelta
print("[dmi-collector][bootstrap] importing third_party_modules", flush=True)
import gradio as gr
import requests
from huggingface_hub import HfApi, hf_hub_download
import schedule
import time
import threading
import os
from zoneinfo import ZoneInfo
print("[dmi-collector][bootstrap] third_party_modules_imported", flush=True)
# =============================================================================
# CONFIGURATION
# =============================================================================
DATASET_NAME = "Ciroc0/dmi-aarhus-weather-data"
PREDICTIONS_DATASET = "Ciroc0/dmi-aarhus-predictions"
AARHUS_LAT = 56.1567
AARHUS_LON = 10.2108
HF_TOKEN = os.environ.get("HF_TOKEN")
FRONTEND_SNAPSHOT_FILE = "frontend_snapshot.json"
COPENHAGEN_TZ = ZoneInfo("Europe/Copenhagen")
APP_NAME = "dmi-collector"
HISTORICAL_BACKFILL_START = datetime(2025, 11, 1).date()
TRAINING_HOLDOUT_DAYS = 7
FUTURE_FORECAST_HOURS = 48
faulthandler.enable()
MODEL_FILES = {
"temperature": "temperature_models.pkl",
"wind_speed": "wind_speed_models.pkl",
"wind_gust": "wind_gust_models.pkl",
"rain_event": "rain_event_models.pkl",
"rain_amount": "rain_amount_models.pkl",
}
# Extended feature set from PLAN.md
FORECAST_FEATURES = [
"temperature_2m",
"apparent_temperature",
"relative_humidity_2m",
"dew_point_2m",
"pressure_msl",
"cloud_cover",
"cloud_cover_low",
"cloud_cover_mid",
"cloud_cover_high",
"precipitation",
"rain",
"snowfall",
"precipitation_probability",
"windspeed_10m",
"winddirection_10m",
"windgusts_10m",
"visibility",
"shortwave_radiation",
"direct_radiation",
"weather_code",
"cape",
]
OBSERVATION_FEATURES = [
"temperature_2m",
"relative_humidity_2m",
"pressure_msl",
"precipitation",
"rain",
"windspeed_10m",
"winddirection_10m",
"windgusts_10m",
]
TRAINING_DEDUP_KEYS = ["reference_time", "target_timestamp"]
PREDICTION_DEDUP_KEYS = ["target_timestamp"]
OBSERVATION_CONTEXT_TIMESTAMP_COL = "observation_context_timestamp"
OBSERVATION_SOURCE_COLUMNS = [
"actual_temp",
"actual_humidity",
"actual_pressure",
"actual_precipitation",
"actual_rain",
"actual_wind_speed",
"actual_wind_direction",
"actual_wind_gust",
"actual_wind_u",
"actual_wind_v",
"rain_event",
"rain_amount",
]
OBSERVATION_CONTEXT_COLUMNS = [
"obs_temp_lag_1h",
"obs_temp_mean_3h",
"obs_temp_mean_6h",
"obs_wind_lag_1h",
"obs_wind_mean_3h",
"obs_wind_mean_6h",
"obs_wind_u_lag_1h",
"obs_wind_u_mean_3h",
"obs_wind_v_lag_1h",
"obs_wind_v_mean_3h",
"obs_pressure_lag_1h",
"obs_pressure_mean_3h",
"obs_humidity_lag_1h",
"obs_humidity_mean_3h",
"obs_precip_lag_1h",
"obs_precip_sum_3h",
"obs_precip_sum_6h",
"obs_precip_sum_12h",
]
class LazyModule:
def __init__(self, module_name):
self.module_name = module_name
self._module = None
def _load(self):
if self._module is None:
self._module = importlib.import_module(self.module_name)
return self._module
def __getattr__(self, item):
return getattr(self._load(), item)
pd = LazyModule("pandas")
np = LazyModule("numpy")
joblib = LazyModule("joblib")
@dataclass
class AppState:
lock: threading.Lock = field(default_factory=threading.Lock)
warming: bool = True
ready: bool = False
last_error: str | None = None
cache_revision: str | None = None
active_job: bool = False
model_bundle_cache: dict = field(default_factory=dict)
catch_up_started: bool = False
catch_up_completed: bool = False
APP_STATE = AppState()
def log_event(message, **fields):
"""Emit a structured log line that shows up clearly in HF runtime logs."""
timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
details = " ".join(f"{key}={fields[key]!r}" for key in sorted(fields))
if details:
print(f"[{APP_NAME}] {timestamp} {message} {details}", flush=True)
else:
print(f"[{APP_NAME}] {timestamp} {message}", flush=True)
def log_exception(context, exc):
"""Log an exception with a full traceback."""
log_event(f"{context} failed", error=str(exc), error_type=type(exc).__name__)
print(traceback.format_exc(), flush=True)
def install_global_logging():
"""Install process-level exception logging."""
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
log_event("uncaught_exception", error=str(exc_value), error_type=exc_type.__name__)
print("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)), flush=True)
sys.excepthook = handle_exception
def log_startup():
"""Log runtime context during process startup."""
log_event(
"startup",
python=sys.version.split()[0],
platform=platform.platform(),
cwd=os.getcwd(),
hf_token_present=bool(HF_TOKEN),
dataset=DATASET_NAME,
predictions_dataset=PREDICTIONS_DATASET,
)
def run_logged(name, fn, *args, **kwargs):
"""Run a function with start/end/error logging."""
log_event(f"{name} started")
try:
result = fn(*args, **kwargs)
if isinstance(result, pd.DataFrame):
log_event(f"{name} completed", rows=len(result), columns=list(result.columns))
else:
log_event(f"{name} completed", result_type=type(result).__name__)
return result
except Exception as exc:
log_exception(name, exc)
raise
install_global_logging()
log_event("bootstrap_begin")
def build_app_status_text():
with APP_STATE.lock:
if APP_STATE.last_error:
return f"Status: failed. {APP_STATE.last_error}"
if APP_STATE.warming and APP_STATE.active_job:
return "Status: warming up. Post-start catch-up is running in the background."
if APP_STATE.warming:
return "Status: warming up. The UI is live; startup catch-up will run in the background."
if APP_STATE.active_job:
return "Status: ready. Background maintenance is currently running."
return "Status: ready. Forecast, daily update, prediction, and verification actions run on demand or by schedule."
def note_app_error(exc):
with APP_STATE.lock:
APP_STATE.last_error = str(exc)
def clear_model_bundle_cache(cache_revision=None):
with APP_STATE.lock:
if cache_revision is None or APP_STATE.cache_revision != cache_revision:
APP_STATE.model_bundle_cache = {}
APP_STATE.cache_revision = cache_revision
def set_app_ready():
with APP_STATE.lock:
APP_STATE.warming = False
APP_STATE.ready = True
APP_STATE.last_error = None
def now_cph():
"""Current time in Copenhagen timezone."""
return datetime.now(COPENHAGEN_TZ)
def get_lead_bucket(lead_hours):
"""Map lead time to bucket."""
if lead_hours <= 6:
return "1-6"
elif lead_hours <= 12:
return "7-12"
elif lead_hours <= 24:
return "13-24"
else:
return "25-48"
def ensure_copenhagen_time(df, column_name):
"""Ensure a datetime column is timezone-aware in Europe/Copenhagen."""
if column_name not in df.columns:
return df
series = pd.to_datetime(df[column_name], errors="coerce")
if getattr(series.dt, "tz", None) is None:
df[column_name] = series.dt.tz_localize(COPENHAGEN_TZ, ambiguous="infer", nonexistent="shift_forward")
else:
df[column_name] = series.dt.tz_convert(COPENHAGEN_TZ)
return df
def dedupe_rows(df, dedup_keys, sort_keys=None, keep="last"):
"""Sort and drop duplicate rows using the keys available on the dataframe."""
if df is None or len(df) == 0:
return df
available_sort_keys = [key for key in (sort_keys or dedup_keys) if key in df.columns]
if available_sort_keys:
df = df.sort_values(available_sort_keys).reset_index(drop=True)
available_dedup_keys = [key for key in dedup_keys if key in df.columns]
if available_dedup_keys:
df = df.drop_duplicates(subset=available_dedup_keys, keep=keep).reset_index(drop=True)
return df
def find_new_rows(candidate_df, existing_df, dedup_keys):
"""Return rows that are new compared with an existing dataframe."""
if existing_df is None or len(existing_df) == 0:
return candidate_df
keys = [key for key in dedup_keys if key in candidate_df.columns and key in existing_df.columns]
if not keys:
return candidate_df
existing_keys = existing_df[keys].drop_duplicates().copy()
existing_keys["_existing"] = True
merged = candidate_df.merge(existing_keys, on=keys, how="left")
merged = merged[merged["_existing"] != True].drop(columns=["_existing"])
return merged.reset_index(drop=True)
def build_model_features(df):
"""Build the causal feature set used by both training and live inference."""
if df is None or len(df) == 0:
return df
features_df = df.copy()
features_df = add_temporal_features(features_df)
features_df = add_run_delta_features(features_df)
return features_df
def ensure_observation_context_columns(df):
"""Keep the observation-context schema stable even when context is unavailable."""
if df is None:
return df
if OBSERVATION_CONTEXT_TIMESTAMP_COL not in df.columns:
df[OBSERVATION_CONTEXT_TIMESTAMP_COL] = pd.NaT
for column_name in OBSERVATION_CONTEXT_COLUMNS:
if column_name not in df.columns:
df[column_name] = np.nan
return df
def get_optional_series(df, column_name):
if column_name in df.columns:
return df[column_name]
return pd.Series(np.nan, index=df.index, dtype="float64")
def datetime_series_to_epoch_ns(series):
"""Normalize mixed timestamp precisions to comparable epoch nanoseconds."""
normalized = pd.to_datetime(series, errors="coerce", utc=True)
naive = normalized.dt.tz_localize(None)
return naive.astype("datetime64[ns]").astype("int64", copy=False)
def normalize_prediction_df(pred_df):
"""Normalize prediction history to the current schema."""
if pred_df is None or len(pred_df) == 0:
return pred_df
if "timestamp" in pred_df.columns and "target_timestamp" not in pred_df.columns:
pred_df = pred_df.rename(columns={"timestamp": "target_timestamp"})
for column_name in ["target_timestamp", "reference_time", "prediction_made_at"]:
pred_df = ensure_copenhagen_time(pred_df, column_name)
if "target_timestamp" in pred_df.columns:
pred_df = pred_df[pred_df["target_timestamp"].notna()].copy()
if "verified" not in pred_df.columns:
pred_df["verified"] = False
pred_df["verified"] = pred_df["verified"].fillna(False).astype(bool)
pred_df = dedupe_rows(
pred_df,
PREDICTION_DEDUP_KEYS,
sort_keys=["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"],
keep="last",
)
return pred_df
def merge_prediction_history(existing_df, new_df):
"""Upsert future rows while preserving historical prediction rows."""
if existing_df is None or len(existing_df) == 0:
return normalize_prediction_df(new_df)
if new_df is None or len(new_df) == 0:
return normalize_prediction_df(existing_df)
existing = normalize_prediction_df(existing_df).copy()
incoming = normalize_prediction_df(new_df).copy()
existing["_merge_priority"] = 0
incoming["_merge_priority"] = 1
combined = pd.concat([existing, incoming], ignore_index=True, sort=False)
combined = normalize_prediction_df(combined)
sort_keys = [
key
for key in ["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"]
if key in combined.columns
]
if sort_keys:
combined = combined.sort_values(sort_keys).reset_index(drop=True)
if "target_timestamp" in combined.columns:
combined = combined.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True)
if "_merge_priority" in combined.columns:
combined = combined.drop(columns=["_merge_priority"])
return combined
# =============================================================================
# FORECAST FETCHING
# =============================================================================
def fetch_forecasts_for_period(start_date, end_date):
"""
Fetch historical forecast runs for Aarhus from Open-Meteo.
Returns DataFrame with all features.
"""
log_event("fetch_forecasts_for_period", start_date=str(start_date), end_date=str(end_date))
all_forecasts = []
run_hours = [0, 3, 6, 9, 12, 15, 18, 21]
current_date = start_date
cph_now = now_cph()
while current_date <= end_date:
for hour in run_hours:
reference_time = datetime.combine(current_date, datetime.min.time()) + timedelta(hours=hour)
reference_time = reference_time.replace(tzinfo=COPENHAGEN_TZ)
if reference_time > cph_now:
continue
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": AARHUS_LAT,
"longitude": AARHUS_LON,
"start_date": current_date.strftime("%Y-%m-%d"),
"end_date": (current_date + timedelta(days=2)).strftime("%Y-%m-%d"),
"models": "dmi_harmonie",
"hourly": FORECAST_FEATURES,
"timezone": "Europe/Copenhagen"
}
try:
resp = requests.get(url, params=params, timeout=30)
if resp.status_code != 200:
del params['models']
resp = requests.get(url, params=params, timeout=30)
if resp.status_code == 200:
data = resp.json()
if 'hourly' in data:
times = pd.to_datetime(data['hourly']['time'])
times = times.tz_localize('Europe/Copenhagen', ambiguous='infer')
for i, target_time in enumerate(times):
lead_hours = (target_time - reference_time).total_seconds() / 3600
if 0 < lead_hours <= 48:
row = {
'target_timestamp': target_time,
'reference_time': reference_time,
'lead_time_hours': int(lead_hours),
'lead_bucket': get_lead_bucket(int(lead_hours)),
'latitude': AARHUS_LAT,
'longitude': AARHUS_LON,
}
# Add all forecast features with dmi_ prefix
for feat in FORECAST_FEATURES:
col_name = f"dmi_{feat}_pred"
row[col_name] = data['hourly'].get(feat, [None] * len(times))[i]
# Compute wind components
wind_speed = row.get('dmi_windspeed_10m_pred', 0) or 0
wind_dir = row.get('dmi_winddirection_10m_pred', 0) or 0
row['forecast_wind_u'] = -wind_speed * np.sin(np.radians(wind_dir))
row['forecast_wind_v'] = -wind_speed * np.cos(np.radians(wind_dir))
all_forecasts.append(row)
except Exception as e:
print(f"Error fetching forecast for {current_date} hour {hour}: {e}")
continue
current_date += timedelta(days=1)
time.sleep(0.1)
if not all_forecasts:
log_event("fetch_forecasts_for_period no_data", start_date=str(start_date), end_date=str(end_date))
return None
df = pd.DataFrame(all_forecasts)
df = dedupe_rows(df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
log_event("fetch_forecasts_for_period done", rows=len(df))
return df
def fetch_future_forecasts():
"""Fetch future forecasts - 48 hours ahead."""
log_event("fetch_future_forecasts started")
now = now_cph()
today = now.date()
current_hour = now.hour
run_hours = [0, 3, 6, 9, 12, 15, 18, 21]
latest_run = max([h for h in run_hours if h <= current_hour], default=0)
reference_time = datetime.combine(today, datetime.min.time()) + timedelta(hours=latest_run)
reference_time = reference_time.replace(tzinfo=COPENHAGEN_TZ)
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": AARHUS_LAT,
"longitude": AARHUS_LON,
"start_date": today.strftime("%Y-%m-%d"),
"end_date": (today + timedelta(days=3)).strftime("%Y-%m-%d"),
"models": "dmi_harmonie",
"hourly": FORECAST_FEATURES,
"timezone": "Europe/Copenhagen"
}
try:
resp = requests.get(url, params=params, timeout=30)
if resp.status_code != 200:
del params['models']
resp = requests.get(url, params=params, timeout=30)
if resp.status_code != 200:
return None
data = resp.json()
if 'hourly' not in data:
return None
times = pd.to_datetime(data['hourly']['time'])
times = times.tz_localize('Europe/Copenhagen', ambiguous='infer')
forecasts = []
for i, target_time in enumerate(times):
if target_time > now:
lead_hours = (target_time - reference_time).total_seconds() / 3600
if 0 < lead_hours <= 48:
row = {
'target_timestamp': target_time,
'reference_time': reference_time,
'lead_time_hours': int(lead_hours),
'lead_bucket': get_lead_bucket(int(lead_hours)),
}
# Add all forecast features
for feat in FORECAST_FEATURES:
col_name = f"dmi_{feat}_pred"
row[col_name] = data['hourly'].get(feat, [None] * len(times))[i]
# Compute wind components
wind_speed = row.get('dmi_windspeed_10m_pred', 0) or 0
wind_dir = row.get('dmi_winddirection_10m_pred', 0) or 0
row['forecast_wind_u'] = -wind_speed * np.sin(np.radians(wind_dir))
row['forecast_wind_v'] = -wind_speed * np.cos(np.radians(wind_dir))
forecasts.append(row)
if not forecasts:
log_event("fetch_future_forecasts no_data")
return None
df = pd.DataFrame(forecasts)
df = df.drop_duplicates(subset=['target_timestamp'], keep='first')
df = df.sort_values('target_timestamp').reset_index(drop=True)
log_event("fetch_future_forecasts done", rows=len(df))
return df
except Exception as e:
log_exception("fetch_future_forecasts", e)
return None
# =============================================================================
# OBSERVATION FETCHING
# =============================================================================
def fetch_observations_for_period(start_date, end_date):
"""
Fetch actual weather observations for Aarhus from Open-Meteo archive.
"""
log_event("fetch_observations_for_period", start_date=str(start_date), end_date=str(end_date))
url = "https://archive-api.open-meteo.com/v1/archive"
cph_today = now_cph().date()
if end_date > cph_today:
end_date = cph_today
params = {
"latitude": AARHUS_LAT,
"longitude": AARHUS_LON,
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
"hourly": OBSERVATION_FEATURES,
"timezone": "Europe/Copenhagen"
}
try:
resp = requests.get(url, params=params, timeout=60)
if resp.status_code != 200:
return None
data = resp.json()
if 'hourly' not in data:
return None
times = pd.to_datetime(data['hourly']['time'])
times = times.tz_localize('Europe/Copenhagen', ambiguous='infer')
df = pd.DataFrame({
'target_timestamp': times,
'actual_temp': data['hourly'].get('temperature_2m'),
'actual_humidity': data['hourly'].get('relative_humidity_2m'),
'actual_pressure': data['hourly'].get('pressure_msl'),
'actual_precipitation': data['hourly'].get('precipitation'),
'actual_rain': data['hourly'].get('rain'),
'actual_wind_speed': data['hourly'].get('windspeed_10m'),
'actual_wind_direction': data['hourly'].get('winddirection_10m'),
'actual_wind_gust': data['hourly'].get('windgusts_10m'),
})
# Derived targets
df['actual_wind_u'] = -df['actual_wind_speed'] * np.sin(np.radians(df['actual_wind_direction']))
df['actual_wind_v'] = -df['actual_wind_speed'] * np.cos(np.radians(df['actual_wind_direction']))
df['rain_event'] = (df['actual_precipitation'] > 0.1).astype(int)
df['rain_amount'] = df['actual_precipitation']
# Filter future times
current_hour = now_cph().replace(minute=0, second=0, microsecond=0)
df = df[df['target_timestamp'] <= current_hour]
log_event("fetch_observations_for_period done", rows=len(df))
return df
except Exception as e:
log_exception("fetch_observations_for_period", e)
return None
# =============================================================================
# FEATURE ENGINEERING
# =============================================================================
def add_cyclical_features(df, col, period):
"""Add sin/cos cyclical encoding for a time feature."""
df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period)
df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period)
return df
def add_temporal_features(df):
"""Add temporal features to dataframe."""
df['hour'] = df['reference_time'].dt.hour
df['month'] = df['reference_time'].dt.month
df['day_of_year'] = df['reference_time'].dt.dayofyear
df = add_cyclical_features(df, 'hour', 24)
df = add_cyclical_features(df, 'month', 12)
return df
def add_run_delta_features(df):
"""Add features showing change between consecutive forecast runs."""
df = df.sort_values(['reference_time', 'target_timestamp'])
# For each target timestamp, compute delta from previous run
for feat in ['temperature_2m', 'windspeed_10m', 'windgusts_10m', 'precipitation', 'pressure_msl', 'relative_humidity_2m']:
col = f'dmi_{feat}_pred'
delta_col = f'dmi_{feat}_pred_run_delta'
if col in df.columns:
df[delta_col] = df.groupby('target_timestamp')[col].diff().fillna(0)
return df
def build_observation_context_frame(observations_df):
"""Build causal lag features from unique observation timestamps."""
columns = [OBSERVATION_CONTEXT_TIMESTAMP_COL, *OBSERVATION_CONTEXT_COLUMNS]
if observations_df is None or len(observations_df) == 0:
return pd.DataFrame(columns=columns)
observations = observations_df.copy()
observations = ensure_copenhagen_time(observations, "target_timestamp")
observations = dedupe_rows(observations, ["target_timestamp"], sort_keys=["target_timestamp"], keep="last")
observations = observations.sort_values("target_timestamp").reset_index(drop=True)
if "actual_wind_u" not in observations.columns and {"actual_wind_speed", "actual_wind_direction"}.issubset(observations.columns):
observations["actual_wind_u"] = -observations["actual_wind_speed"] * np.sin(np.radians(observations["actual_wind_direction"]))
if "actual_wind_v" not in observations.columns and {"actual_wind_speed", "actual_wind_direction"}.issubset(observations.columns):
observations["actual_wind_v"] = -observations["actual_wind_speed"] * np.cos(np.radians(observations["actual_wind_direction"]))
if "rain_event" not in observations.columns and "actual_precipitation" in observations.columns:
observations["rain_event"] = (observations["actual_precipitation"].fillna(0.0) > 0.1).astype(int)
if "rain_amount" not in observations.columns and "actual_precipitation" in observations.columns:
observations["rain_amount"] = observations["actual_precipitation"]
context = pd.DataFrame({OBSERVATION_CONTEXT_TIMESTAMP_COL: observations["target_timestamp"]})
temp_series = get_optional_series(observations, "actual_temp").shift(1)
wind_speed_series = get_optional_series(observations, "actual_wind_speed").shift(1)
wind_u_series = get_optional_series(observations, "actual_wind_u").shift(1)
wind_v_series = get_optional_series(observations, "actual_wind_v").shift(1)
pressure_series = get_optional_series(observations, "actual_pressure").shift(1)
humidity_series = get_optional_series(observations, "actual_humidity").shift(1)
precipitation_series = get_optional_series(observations, "actual_precipitation").shift(1)
context["obs_temp_lag_1h"] = temp_series
context["obs_temp_mean_3h"] = temp_series.rolling(3, min_periods=1).mean()
context["obs_temp_mean_6h"] = temp_series.rolling(6, min_periods=1).mean()
context["obs_wind_lag_1h"] = wind_speed_series
context["obs_wind_mean_3h"] = wind_speed_series.rolling(3, min_periods=1).mean()
context["obs_wind_mean_6h"] = wind_speed_series.rolling(6, min_periods=1).mean()
context["obs_wind_u_lag_1h"] = wind_u_series
context["obs_wind_u_mean_3h"] = wind_u_series.rolling(3, min_periods=1).mean()
context["obs_wind_v_lag_1h"] = wind_v_series
context["obs_wind_v_mean_3h"] = wind_v_series.rolling(3, min_periods=1).mean()
context["obs_pressure_lag_1h"] = pressure_series
context["obs_pressure_mean_3h"] = pressure_series.rolling(3, min_periods=1).mean()
context["obs_humidity_lag_1h"] = humidity_series
context["obs_humidity_mean_3h"] = humidity_series.rolling(3, min_periods=1).mean()
context["obs_precip_lag_1h"] = precipitation_series
context["obs_precip_sum_3h"] = precipitation_series.rolling(3, min_periods=1).sum()
context["obs_precip_sum_6h"] = precipitation_series.rolling(6, min_periods=1).sum()
context["obs_precip_sum_12h"] = precipitation_series.rolling(12, min_periods=1).sum()
context = ensure_observation_context_columns(context)
return context.sort_values(OBSERVATION_CONTEXT_TIMESTAMP_COL).reset_index(drop=True)
def attach_observation_context(df, observation_context_df):
"""Attach only observations that existed at the forecast reference time."""
if df is None or len(df) == 0:
return ensure_observation_context_columns(df)
enriched = df.copy()
drop_cols = [OBSERVATION_CONTEXT_TIMESTAMP_COL, *OBSERVATION_CONTEXT_COLUMNS]
existing_context_cols = [column_name for column_name in drop_cols if column_name in enriched.columns]
if existing_context_cols:
enriched = enriched.drop(columns=existing_context_cols)
if "reference_time" not in enriched.columns:
return ensure_observation_context_columns(enriched)
enriched = ensure_copenhagen_time(enriched, "reference_time")
enriched["_row_order"] = np.arange(len(enriched))
left = enriched.copy()
left["_reference_time_key"] = datetime_series_to_epoch_ns(left["reference_time"])
left = left.sort_values("_reference_time_key").reset_index(drop=True)
if observation_context_df is None or len(observation_context_df) == 0:
left = ensure_observation_context_columns(left)
left = left.sort_values("_row_order").drop(columns=["_row_order"]).reset_index(drop=True)
return left
if OBSERVATION_CONTEXT_TIMESTAMP_COL not in observation_context_df.columns:
observation_context_df = build_observation_context_frame(observation_context_df)
else:
observation_context_df = ensure_observation_context_columns(observation_context_df.copy())
observation_context_df = ensure_copenhagen_time(observation_context_df, OBSERVATION_CONTEXT_TIMESTAMP_COL)
observation_context_df = observation_context_df.dropna(subset=[OBSERVATION_CONTEXT_TIMESTAMP_COL]).copy()
observation_context_df["_observation_context_key"] = datetime_series_to_epoch_ns(
observation_context_df[OBSERVATION_CONTEXT_TIMESTAMP_COL]
)
observation_context_df = observation_context_df.sort_values("_observation_context_key").reset_index(drop=True)
merged = pd.merge_asof(
left,
observation_context_df[
[OBSERVATION_CONTEXT_TIMESTAMP_COL, "_observation_context_key", *OBSERVATION_CONTEXT_COLUMNS]
],
left_on="_reference_time_key",
right_on="_observation_context_key",
direction="backward",
)
merged = ensure_observation_context_columns(merged)
merged = merged.sort_values("_row_order").drop(columns=["_row_order", "_reference_time_key", "_observation_context_key"]).reset_index(drop=True)
return merged
def build_live_feature_frame(forecasts_df):
"""Build live inference features including causal observation context when available."""
feature_frame = build_model_features(forecasts_df)
if feature_frame is None or len(feature_frame) == 0:
return ensure_observation_context_columns(feature_frame)
if "reference_time" not in feature_frame.columns:
return ensure_observation_context_columns(feature_frame)
reference_series = pd.to_datetime(feature_frame["reference_time"], errors="coerce")
min_reference_time = reference_series.min()
max_reference_time = reference_series.max()
if pd.isna(min_reference_time) or pd.isna(max_reference_time):
return ensure_observation_context_columns(feature_frame)
observation_start = (min_reference_time - timedelta(days=2)).date()
observation_end = max_reference_time.date()
observations = fetch_observations_for_period(observation_start, observation_end)
if observations is None or len(observations) == 0:
log_event("build_live_feature_frame missing_observations", start_date=str(observation_start), end_date=str(observation_end))
return ensure_observation_context_columns(feature_frame)
observation_context = build_observation_context_frame(observations)
return attach_observation_context(feature_frame, observation_context)
def extract_observations_from_training_matrix(df):
"""Recover a unique observation frame from the stored training matrix."""
if df is None or len(df) == 0 or "target_timestamp" not in df.columns:
return None
source_columns = ["target_timestamp", *[column_name for column_name in OBSERVATION_SOURCE_COLUMNS if column_name in df.columns]]
observations = df[source_columns].copy()
observations = ensure_copenhagen_time(observations, "target_timestamp")
observations = dedupe_rows(observations, ["target_timestamp"], sort_keys=["target_timestamp"], keep="last")
return observations.sort_values("target_timestamp").reset_index(drop=True)
def upgrade_training_matrix_with_observation_context(existing_df):
"""Rebuild observation-context features for the full stored training matrix."""
if existing_df is None or len(existing_df) == 0:
return ensure_observation_context_columns(existing_df)
upgraded = existing_df.copy()
upgraded = ensure_copenhagen_time(upgraded, "target_timestamp")
upgraded = ensure_copenhagen_time(upgraded, "reference_time")
observations = extract_observations_from_training_matrix(upgraded)
observation_context = build_observation_context_frame(observations)
upgraded = attach_observation_context(upgraded, observation_context)
return dedupe_rows(upgraded, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
def build_training_matrix(forecasts_df, observations_df):
"""
Build training matrix by merging forecasts with observations.
Adds all derived features.
"""
if forecasts_df is None or observations_df is None:
log_event("build_training_matrix missing_inputs")
return None
# Merge on target_timestamp
merged = pd.merge(forecasts_df, observations_df, on='target_timestamp', how='inner')
if len(merged) == 0:
log_event("build_training_matrix no_matches")
return None
merged = build_model_features(merged)
observation_context = build_observation_context_frame(observations_df)
merged = attach_observation_context(merged, observation_context)
# Add correction targets for temperature and wind
merged['temp_correction_target'] = merged['actual_temp'] - merged['dmi_temperature_2m_pred']
merged['wind_speed_correction_target'] = merged['actual_wind_speed'] - merged['dmi_windspeed_10m_pred']
merged['wind_gust_correction_target'] = merged['actual_wind_gust'] - merged['dmi_windgusts_10m_pred']
# Filter out future times
current_hour = now_cph().replace(minute=0, second=0, microsecond=0)
merged = merged[merged['target_timestamp'] <= current_hour]
merged = dedupe_rows(merged, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
log_event("build_training_matrix done", rows=len(merged), columns=len(merged.columns))
return merged
# =============================================================================
# DATASET OPERATIONS
# =============================================================================
def upload_to_dataset(local_path, filename, dataset_name, repo_type="dataset"):
"""Upload a file to Hugging Face dataset."""
log_event("upload_to_dataset", local_path=local_path, filename=filename, dataset_name=dataset_name)
try:
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=dataset_name,
repo_type=repo_type,
token=HF_TOKEN
)
log_event("upload_to_dataset done", filename=filename, dataset_name=dataset_name)
return True
except Exception as e:
log_exception("upload_to_dataset", e)
return False
def load_from_dataset(filename, dataset_name, repo_type="dataset"):
"""Load a file from Hugging Face dataset."""
log_event("load_from_dataset", filename=filename, dataset_name=dataset_name)
try:
path = hf_hub_download(
repo_id=dataset_name,
filename=filename,
repo_type=repo_type,
token=HF_TOKEN
)
log_event("load_from_dataset done", filename=filename, dataset_name=dataset_name, path=path)
return path
except Exception as e:
log_exception("load_from_dataset", e)
return None
def load_first_available_dataset_file(filenames, dataset_name, repo_type="dataset"):
"""Return the first dataset file that exists."""
for filename in filenames:
path = load_from_dataset(filename, dataset_name, repo_type=repo_type)
if path:
return path, filename
return None, None
def init_dataset_if_needed():
"""Ensure training_matrix.parquet exists so first runs can start cleanly."""
existing_path, existing_name = load_first_available_dataset_file(
["training_matrix.parquet", "data.parquet"],
DATASET_NAME,
)
if existing_path:
print(f"โœ… Training dataset already available as {existing_name}")
return existing_path, existing_name
empty_df = pd.DataFrame(columns=TRAINING_DEDUP_KEYS)
empty_df.to_parquet("training_matrix.parquet")
if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME):
print("โœ… Initialized empty training_matrix.parquet")
return "training_matrix.parquet", "training_matrix.parquet"
print("โš ๏ธ Could not initialize training_matrix.parquet")
return None, None
def load_existing_training_matrix():
"""Load training matrix using the new filename first and legacy fallback second."""
existing_path, existing_name = load_first_available_dataset_file(
["training_matrix.parquet", "data.parquet"],
DATASET_NAME,
)
if not existing_path:
return None, None
existing = pd.read_parquet(existing_path)
if existing_name == "data.parquet" and "timestamp" in existing.columns and "target_timestamp" not in existing.columns:
existing = existing.rename(columns={"timestamp": "target_timestamp"})
if existing.empty or "target_timestamp" not in existing.columns:
return existing, existing_name
existing = ensure_copenhagen_time(existing, "target_timestamp")
existing = ensure_copenhagen_time(existing, "reference_time")
existing = ensure_copenhagen_time(existing, OBSERVATION_CONTEXT_TIMESTAMP_COL)
existing = dedupe_rows(existing, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
return existing, existing_name
def safe_number(value, digits=3, default=None):
"""Convert pandas/numpy scalars to JSON-safe floats."""
if value is None:
return default
try:
if pd.isna(value):
return default
except Exception:
pass
try:
return round(float(value), digits)
except Exception:
return default
def to_iso(value):
"""Serialize timestamps for the frontend snapshot."""
if value is None:
return None
try:
if pd.isna(value):
return None
except Exception:
pass
if hasattr(value, "isoformat"):
return value.isoformat()
return str(value)
def load_json_from_dataset(filename, dataset_name=DATASET_NAME):
path = load_from_dataset(filename, dataset_name)
if not path:
return None
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except Exception as exc:
log_exception(f"load_json_from_dataset[{filename}]", exc)
return None
def extract_feature_importance_from_bundle(bundle, target_name, top_n=8):
"""Aggregate feature importance across active bucket models."""
if bundle is None or "models" not in bundle:
return []
totals = {}
counts = {}
for bucket, model_info in bundle.get("models", {}).items():
model = model_info.get("model")
feature_cols = model_info.get("feature_columns") or bundle.get("feature_columns", [])
importances = getattr(model, "feature_importances_", None)
if importances is None or not feature_cols:
continue
for feature_name, importance in zip(feature_cols, importances):
totals[feature_name] = totals.get(feature_name, 0.0) + float(importance)
counts[feature_name] = counts.get(feature_name, 0) + 1
rows = []
for feature_name, total in totals.items():
avg_importance = total / max(counts.get(feature_name, 1), 1)
rows.append(
{
"target": target_name,
"feature": feature_name,
"importance": round(avg_importance, 6),
}
)
rows.sort(key=lambda item: item["importance"], reverse=True)
return rows[:top_n]
def build_recent_backtest(training_df):
"""Recreate recent ML-vs-DMI backtest from the stored training matrix."""
if training_df is None or len(training_df) == 0 or "target_timestamp" not in training_df.columns:
return None
current_time = now_cph()
history = training_df[
(training_df["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS))
& (training_df["target_timestamp"] <= current_time)
].copy()
if len(history) == 0:
return None
if "lead_time_hours" in history.columns:
history = history[
history["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both")
].copy()
if len(history) == 0:
return None
history["ml_temp"] = history.get("dmi_temperature_2m_pred", pd.Series(np.nan, index=history.index))
history["ml_wind_speed"] = history.get("dmi_windspeed_10m_pred", pd.Series(np.nan, index=history.index))
history["ml_wind_gust"] = history.get("dmi_windgusts_10m_pred", pd.Series(np.nan, index=history.index))
history["ml_rain_prob"] = (
history.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=history.index))
.fillna(0.0)
.clip(0.0, 100.0)
/ 100.0
)
history["ml_rain_amount"] = (
history.get("dmi_precipitation_pred", pd.Series(0.0, index=history.index))
.fillna(0.0)
.clip(0.0, None)
)
target_specs = [
("temperature", "ml_temp", "dmi_temperature_2m_pred", True),
("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True),
("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True),
("rain_event", "ml_rain_prob", None, False),
("rain_amount", "ml_rain_amount", None, False),
]
for target_name, output_col, baseline_col, is_correction in target_specs:
bundle = load_model_bundle(target_name)
if not bundle:
continue
target_pred = predict_with_bundle(bundle, history)
if target_pred is None:
continue
prediction_series = pd.Series(target_pred, index=history.index, dtype="float64")
prediction_mask = prediction_series.notna()
if not prediction_mask.any():
continue
if is_correction:
history.loc[prediction_mask, output_col] = (
history.loc[prediction_mask, baseline_col] + prediction_series[prediction_mask]
)
elif target_name == "rain_event":
history.loc[prediction_mask, output_col] = prediction_series[prediction_mask].clip(0.0, 1.0)
else:
history.loc[prediction_mask, output_col] = prediction_series[prediction_mask].clip(0.0, None)
sort_columns = ["target_timestamp"]
ascending = [True]
if "lead_time_hours" in history.columns:
sort_columns.append("lead_time_hours")
ascending.append(False)
if "reference_time" in history.columns:
sort_columns.append("reference_time")
ascending.append(False)
history = history.sort_values(sort_columns, ascending=ascending)
history = history.drop_duplicates(subset=["target_timestamp"], keep="first").reset_index(drop=True)
return history
def rebuild_future_ml_columns(predictions_df, registry_revision=None):
"""Recompute live ML columns from stored forecast features before publishing the frontend snapshot."""
if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns:
return predictions_df
current_time = now_cph()
repaired = predictions_df.copy()
future_mask = repaired["target_timestamp"] > current_time
if not future_mask.any():
return repaired
future_df = repaired.loc[future_mask].copy()
future_df["ml_temp"] = future_df.get("ml_temp", future_df.get("dmi_temperature_2m_pred"))
future_df["ml_wind_speed"] = future_df.get("ml_wind_speed", future_df.get("dmi_windspeed_10m_pred"))
future_df["ml_wind_gust"] = future_df.get("ml_wind_gust", future_df.get("dmi_windgusts_10m_pred"))
future_df["ml_rain_prob"] = future_df.get(
"ml_rain_prob",
future_df.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=future_df.index))
.fillna(0.0)
.clip(0.0, 100.0)
/ 100.0,
)
future_df["ml_rain_amount"] = future_df.get(
"ml_rain_amount",
future_df.get("dmi_precipitation_pred", pd.Series(0.0, index=future_df.index)).fillna(0.0).clip(0.0, None),
)
target_specs = [
("temperature", "ml_temp", "dmi_temperature_2m_pred", True),
("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True),
("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True),
("rain_event", "ml_rain_prob", None, False),
("rain_amount", "ml_rain_amount", None, False),
]
for target_name, output_col, baseline_col, is_correction in target_specs:
bundle = load_model_bundle(target_name, cache_revision=registry_revision)
target_pred = predict_with_bundle(bundle, future_df)
if target_pred is None:
continue
target_series = pd.Series(target_pred, index=future_df.index, dtype="float64")
target_mask = target_series.notna()
if not target_mask.any():
continue
if is_correction:
future_df.loc[target_mask, output_col] = future_df.loc[target_mask, baseline_col] + target_series[target_mask]
elif target_name == "rain_event":
future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, 1.0)
else:
future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, None)
future_df["ml_rain_prob"] = future_df["ml_rain_prob"].fillna(0.0).clip(0.0, 1.0)
future_df["ml_rain_amount"] = future_df["ml_rain_amount"].fillna(0.0).clip(0.0, None)
for column_name in future_df.columns:
if column_name not in repaired.columns:
repaired[column_name] = np.nan
repaired.loc[future_mask, future_df.columns] = future_df
return repaired
def build_recent_verified_history(predictions_df):
"""Return verified prediction rows from the last 7 days."""
if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns:
return None
current_time = now_cph()
verified = predictions_df[predictions_df["verified"].fillna(False).astype(bool)].copy()
if len(verified) == 0:
return None
verified = verified[
(verified["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS))
& (verified["target_timestamp"] <= current_time)
].copy()
if len(verified) == 0:
return None
if "lead_time_hours" in verified.columns:
verified = verified[
verified["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both")
].copy()
if len(verified) == 0:
return None
return verified.sort_values("target_timestamp").reset_index(drop=True)
def merge_recent_history_sources(backtest_df=None, predictions_df=None):
"""Combine 7-day backtest with newer verified predictions, preferring verified rows."""
backtest = backtest_df.copy() if backtest_df is not None and len(backtest_df) > 0 else None
verified = build_recent_verified_history(predictions_df)
if backtest is None and verified is None:
return None
if backtest is None:
return verified
if verified is None:
return backtest.sort_values("target_timestamp").reset_index(drop=True)
backtest = backtest.copy()
verified = verified.copy()
backtest["_history_priority"] = 0
verified["_history_priority"] = 1
merged = pd.concat([backtest, verified], ignore_index=True, sort=False)
sort_columns = ["target_timestamp", "_history_priority"]
ascending = [True, True]
if "lead_time_hours" in merged.columns:
sort_columns.append("lead_time_hours")
ascending.append(False)
if "reference_time" in merged.columns:
sort_columns.append("reference_time")
ascending.append(False)
merged = merged.sort_values(sort_columns, ascending=ascending)
merged = merged.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True)
if "_history_priority" in merged.columns:
merged = merged.drop(columns=["_history_priority"])
return merged
def calculate_verification_metrics(predictions_df=None, backtest_df=None):
"""Compute frontend-facing verification summary."""
source_df = merge_recent_history_sources(backtest_df=backtest_df, predictions_df=predictions_df)
period_label = "Ingen verificeret historik endnu"
if source_df is not None and len(source_df) > 0:
verified = build_recent_verified_history(predictions_df)
if verified is not None and len(verified) > 0:
period_label = "Seneste 7 dages backtest og verificerede predictioner"
else:
period_label = "Seneste 7 dages backtest"
result = {
"target": "temperature",
"periodLabel": period_label,
"rmseDmi": None,
"rmseMl": None,
"maeDmi": None,
"maeMl": None,
"winRate": None,
"totalPredictions": 0 if source_df is None else int(len(source_df)),
}
if source_df is None or len(source_df) == 0:
return result
if {"actual_temp", "dmi_temperature_2m_pred", "ml_temp"}.issubset(source_df.columns):
valid = source_df.dropna(subset=["actual_temp", "dmi_temperature_2m_pred", "ml_temp"]).copy()
if len(valid) > 0:
dmi_error = valid["actual_temp"] - valid["dmi_temperature_2m_pred"]
ml_error = valid["actual_temp"] - valid["ml_temp"]
result["rmseDmi"] = safe_number(np.sqrt(np.mean(dmi_error**2)), digits=3)
result["rmseMl"] = safe_number(np.sqrt(np.mean(ml_error**2)), digits=3)
result["maeDmi"] = safe_number(np.mean(np.abs(dmi_error)), digits=3)
result["maeMl"] = safe_number(np.mean(np.abs(ml_error)), digits=3)
win_rate = float((np.abs(ml_error) < np.abs(dmi_error)).mean() * 100)
result["winRate"] = round(win_rate, 1)
result["totalPredictions"] = int(len(valid))
return result
def build_lead_bucket_rows(registry):
"""Normalize registry summary rows for the frontend."""
if not registry:
return []
label_map = {
"1-6": "1-6 timer",
"7-12": "7-12 timer",
"13-24": "13-24 timer",
"25-48": "25-48 timer",
}
rows = []
for row in registry.get("summary_rows", []):
baseline = safe_number(row.get("baseline_metric"), digits=6)
ml_metric = safe_number(row.get("ml_metric"), digits=6)
improvement = None
if baseline not in (None, 0) and ml_metric is not None:
improvement = round(((baseline - ml_metric) / baseline) * 100, 2)
rows.append(
{
"bucket": row.get("lead_bucket"),
"label": label_map.get(row.get("lead_bucket"), row.get("lead_bucket")),
"baselineMetric": baseline,
"mlMetric": ml_metric,
"improvementPct": improvement,
"target": row.get("target"),
}
)
return rows
TARGET_LABELS = {
"temperature": "Temperatur",
"wind_speed": "Vindhastighed",
"wind_gust": "Vindstรธd",
"rain_event": "Regnrisiko",
"rain_amount": "Regnmรฆngde",
}
LEAD_BUCKET_ORDER = ["1-6", "7-12", "13-24", "25-48"]
def build_target_labels():
return dict(TARGET_LABELS)
def build_explanations():
return {
"forecast": "Du ser DMI's prognose side om side med vores ML-justering, nรฅr der er en aktiv model.",
"performance": "Her kan du sammenligne, hvad DMI sagde, hvad ML sagde, og hvad vejret faktisk endte med at blive.",
"sources": "DMI er grundprognosen. ML er vores lokale justering. Hvis en ML-model ikke er aktiv, viser vi DMI direkte.",
}
def build_target_status(registry):
registry_targets = registry.get("targets", {}) if registry else {}
target_status = {}
for target_name in MODEL_FILES:
active_buckets = registry_targets.get(target_name, {}).get("active_buckets") or []
ordered_buckets = [bucket for bucket in LEAD_BUCKET_ORDER if bucket in active_buckets]
has_active_model = len(ordered_buckets) > 0
target_label = TARGET_LABELS.get(target_name, target_name)
if has_active_model:
status_label = "ML aktiv"
status_description = (
f"Vi viser baade DMI og ML for {target_label.lower()}, og ML bruges som den aktive prognose, nรฅr den findes."
)
else:
status_label = "Vises som DMI-prognose"
status_description = (
f"Vi viser DMI direkte for {target_label.lower()}, fordi der ikke er en aktiv ML-model for dette signal endnu."
)
target_status[target_name] = {
"hasActiveModel": has_active_model,
"activeBuckets": ordered_buckets,
"statusLabel": status_label,
"statusDescription": status_description,
}
return target_status
def choose_effective_value(ml_value, dmi_value, has_active_model):
if has_active_model and ml_value is not None:
return ml_value, "ml"
if dmi_value is not None:
return dmi_value, "dmi"
if ml_value is not None:
return ml_value, "ml"
return None, "dmi"
def build_history_payload(predictions_df=None, backtest_df=None):
source_df = merge_recent_history_sources(backtest_df=backtest_df, predictions_df=predictions_df)
history = {
"temperature": [],
"wind": [],
"rain": [],
}
if source_df is None or len(source_df) == 0:
return history
for _, row in source_df.iterrows():
history["temperature"].append(
{
"timestamp": to_iso(row.get("target_timestamp")),
"dmiTemp": safe_number(row.get("dmi_temperature_2m_pred")),
"mlTemp": safe_number(row.get("ml_temp")),
"actual": safe_number(row.get("actual_temp")),
"actualTemp": safe_number(row.get("actual_temp")),
"verified": True,
}
)
history["wind"].append(
{
"timestamp": to_iso(row.get("target_timestamp")),
"dmiWindSpeed": safe_number(row.get("dmi_windspeed_10m_pred")),
"mlWindSpeed": safe_number(row.get("ml_wind_speed")),
"actualWindSpeed": safe_number(row.get("actual_wind_speed")),
"dmiWindGust": safe_number(row.get("dmi_windgusts_10m_pred")),
"mlWindGust": safe_number(row.get("ml_wind_gust")),
"actualWindGust": safe_number(row.get("actual_wind_gust")),
"verified": True,
}
)
dmi_rain_prob = safe_number(row.get("dmi_precipitation_probability_pred"), digits=2, default=0.0) or 0.0
ml_rain_prob = round((safe_number(row.get("ml_rain_prob"), digits=4, default=0.0) or 0.0) * 100, 2)
actual_rain_amount = safe_number(
row.get("actual_rain_amount", row.get("actual_precipitation")),
digits=3,
default=None,
)
if actual_rain_amount is None:
actual_rain_amount = safe_number(row.get("actual_precipitation"), digits=3, default=None)
actual_rain_event = row.get("actual_rain_event")
if actual_rain_event is None or pd.isna(actual_rain_event):
if actual_rain_amount is None:
actual_rain_event = None
else:
actual_rain_event = 1 if actual_rain_amount > 0.1 else 0
else:
actual_rain_event = int(actual_rain_event)
history["rain"].append(
{
"timestamp": to_iso(row.get("target_timestamp")),
"dmiRainProb": dmi_rain_prob,
"mlRainProb": ml_rain_prob,
"actualRainEvent": actual_rain_event,
"dmiRainAmount": safe_number(row.get("dmi_precipitation_pred"), digits=3, default=0.0) or 0.0,
"mlRainAmount": safe_number(row.get("ml_rain_amount"), digits=3, default=0.0) or 0.0,
"actualRainAmount": actual_rain_amount,
"verified": True,
}
)
return history
def build_forecast_row(row, target_status):
dmi_temp = safe_number(row.get("dmi_temperature_2m_pred"))
ml_temp = safe_number(row.get("ml_temp"))
dmi_wind_speed = safe_number(row.get("dmi_windspeed_10m_pred"))
ml_wind_speed = safe_number(row.get("ml_wind_speed"))
dmi_wind_gust = safe_number(row.get("dmi_windgusts_10m_pred"))
ml_wind_gust = safe_number(row.get("ml_wind_gust"))
dmi_rain_prob = safe_number(row.get("dmi_precipitation_probability_pred"), digits=2, default=0.0) or 0.0
ml_rain_prob = round((safe_number(row.get("ml_rain_prob"), digits=4, default=0.0) or 0.0) * 100, 2)
dmi_rain_amount = safe_number(row.get("dmi_precipitation_pred"), digits=3, default=0.0) or 0.0
ml_rain_amount = safe_number(row.get("ml_rain_amount"), digits=3, default=0.0) or 0.0
effective_temp, effective_temp_source = choose_effective_value(
ml_temp,
dmi_temp,
target_status["temperature"]["hasActiveModel"],
)
effective_wind_speed, effective_wind_speed_source = choose_effective_value(
ml_wind_speed,
dmi_wind_speed,
target_status["wind_speed"]["hasActiveModel"],
)
effective_wind_gust, effective_wind_gust_source = choose_effective_value(
ml_wind_gust,
dmi_wind_gust,
target_status["wind_gust"]["hasActiveModel"],
)
effective_rain_prob, effective_rain_prob_source = choose_effective_value(
ml_rain_prob,
dmi_rain_prob,
target_status["rain_event"]["hasActiveModel"],
)
effective_rain_amount, effective_rain_amount_source = choose_effective_value(
ml_rain_amount,
dmi_rain_amount,
target_status["rain_amount"]["hasActiveModel"],
)
timestamp = to_iso(row.get("target_timestamp"))
return {
"timestamp": timestamp,
"hour": timestamp,
"leadTimeHours": int(row.get("lead_time_hours") or 0),
"dmiTemp": dmi_temp,
"mlTemp": ml_temp,
"effectiveTemp": effective_temp,
"effectiveTempSource": effective_temp_source,
"apparentTemp": safe_number(row.get("dmi_apparent_temperature_pred")),
"dmiWindSpeed": dmi_wind_speed,
"mlWindSpeed": ml_wind_speed,
"effectiveWindSpeed": effective_wind_speed,
"effectiveWindSpeedSource": effective_wind_speed_source,
"dmiWindGust": dmi_wind_gust,
"mlWindGust": ml_wind_gust,
"effectiveWindGust": effective_wind_gust,
"effectiveWindGustSource": effective_wind_gust_source,
"windDirection": safe_number(row.get("dmi_winddirection_10m_pred")),
"dmiRainProb": dmi_rain_prob,
"mlRainProb": ml_rain_prob,
"effectiveRainProb": effective_rain_prob or 0.0,
"effectiveRainProbSource": effective_rain_prob_source,
"dmiRainAmount": dmi_rain_amount,
"mlRainAmount": ml_rain_amount,
"effectiveRainAmount": effective_rain_amount or 0.0,
"effectiveRainAmountSource": effective_rain_amount_source,
"weatherCode": int(row.get("dmi_weather_code_pred")) if safe_number(row.get("dmi_weather_code_pred"), digits=0) is not None else None,
"cloudCover": safe_number(row.get("dmi_cloud_cover_pred")),
"humidity": safe_number(row.get("dmi_relative_humidity_2m_pred")),
"pressure": safe_number(row.get("dmi_pressure_msl_pred")),
}
def build_current_payload(current_row):
if not current_row:
return {
"timestamp": to_iso(now_cph()),
"temp": None,
"dmiTemp": None,
"mlTemp": None,
"tempSource": "dmi",
"apparentTemp": None,
"windSpeed": None,
"dmiWindSpeed": None,
"mlWindSpeed": None,
"windSpeedSource": "dmi",
"windGust": None,
"dmiWindGust": None,
"mlWindGust": None,
"windGustSource": "dmi",
"windDirection": None,
"rainProb": 0.0,
"dmiRainProb": 0.0,
"mlRainProb": 0.0,
"rainProbSource": "dmi",
"rainAmount": 0.0,
"dmiRainAmount": 0.0,
"mlRainAmount": 0.0,
"rainAmountSource": "dmi",
"humidity": None,
"pressure": None,
"cloudCover": None,
"weatherCode": None,
}
return {
"timestamp": current_row["timestamp"],
"temp": current_row["effectiveTemp"],
"dmiTemp": current_row["dmiTemp"],
"mlTemp": current_row["mlTemp"],
"tempSource": current_row["effectiveTempSource"],
"apparentTemp": current_row["apparentTemp"],
"windSpeed": current_row["effectiveWindSpeed"],
"dmiWindSpeed": current_row["dmiWindSpeed"],
"mlWindSpeed": current_row["mlWindSpeed"],
"windSpeedSource": current_row["effectiveWindSpeedSource"],
"windGust": current_row["effectiveWindGust"],
"dmiWindGust": current_row["dmiWindGust"],
"mlWindGust": current_row["mlWindGust"],
"windGustSource": current_row["effectiveWindGustSource"],
"windDirection": current_row["windDirection"],
"rainProb": current_row["effectiveRainProb"],
"dmiRainProb": current_row["dmiRainProb"],
"mlRainProb": current_row["mlRainProb"],
"rainProbSource": current_row["effectiveRainProbSource"],
"rainAmount": current_row["effectiveRainAmount"],
"dmiRainAmount": current_row["dmiRainAmount"],
"mlRainAmount": current_row["mlRainAmount"],
"rainAmountSource": current_row["effectiveRainAmountSource"],
"humidity": current_row["humidity"],
"pressure": current_row["pressure"],
"cloudCover": current_row["cloudCover"],
"weatherCode": current_row["weatherCode"],
}
def build_alert_rows(forecast_rows):
"""Derive lightweight alert messages from the live forecast."""
alerts = []
if not forecast_rows:
return [
{
"type": "data",
"severity": "warning",
"title": "Ingen forecast-data",
"message": "Collector kunne ikke bygge et forecast-snapshot.",
}
]
max_wind_row = max(
forecast_rows,
key=lambda row: max(row.get("effectiveWindSpeed") or 0.0, row.get("effectiveWindGust") or 0.0),
)
max_rain_row = max(
forecast_rows,
key=lambda row: max(row.get("effectiveRainProb") or 0.0, row.get("effectiveRainAmount") or 0.0),
)
max_wind_speed = max_wind_row.get("effectiveWindSpeed") or 0.0
max_wind_gust = max_wind_row.get("effectiveWindGust") or 0.0
max_rain_prob = max_rain_row.get("effectiveRainProb") or 0.0
max_rain_amount = max_rain_row.get("effectiveRainAmount") or 0.0
wind_source = (
"ML"
if max_wind_row.get("effectiveWindSpeedSource") == "ml" or max_wind_row.get("effectiveWindGustSource") == "ml"
else "DMI"
)
rain_source = (
"ML"
if max_rain_row.get("effectiveRainProbSource") == "ml" or max_rain_row.get("effectiveRainAmountSource") == "ml"
else "DMI"
)
if max_wind_speed >= 15 or max_wind_gust >= 20:
alerts.append(
{
"type": "wind",
"severity": "warning",
"title": "Kraftig vind",
"message": f"{wind_source} forventer op til {max_wind_speed:.1f} m/s og vindstรธd op til {max_wind_gust:.1f} m/s i forecast-vinduet.",
}
)
if max_rain_prob >= 70 or max_rain_amount >= 5:
alerts.append(
{
"type": "rain",
"severity": "warning",
"title": "Vรฅd periode",
"message": f"{rain_source} forventer op til {max_rain_prob:.0f}% regnrisiko og {max_rain_amount:.1f} mm/time i forecast-vinduet.",
}
)
if not alerts:
alerts.append(
{
"type": "data",
"severity": "info",
"title": "Roligt forecast",
"message": "Snapshot viser ingen kraftige regn- eller vindsignaler lige nu.",
}
)
return alerts
def build_frontend_snapshot():
"""Build the JSON contract consumed by the Vercel frontend."""
predictions_df = load_predictions_snapshot()
training_df, _ = load_existing_training_matrix()
registry = load_json_from_dataset("model_registry.json", DATASET_NAME) or {}
model_meta = load_json_from_dataset("model_meta.json", DATASET_NAME) or {}
registry_revision = registry.get("generated_at")
if registry_revision:
clear_model_bundle_cache(registry_revision)
predictions_df = rebuild_future_ml_columns(predictions_df, registry_revision=registry_revision)
target_status = build_target_status(registry)
backtest_df = build_recent_backtest(training_df)
verification = calculate_verification_metrics(predictions_df=predictions_df, backtest_df=backtest_df)
history = build_history_payload(predictions_df=predictions_df, backtest_df=backtest_df)
future_df = None
if predictions_df is not None and len(predictions_df) > 0:
future_df = predictions_df[predictions_df["target_timestamp"] > now_cph()].copy()
future_df = future_df.sort_values("target_timestamp").head(48).reset_index(drop=True)
forecast_rows = []
if future_df is not None and len(future_df) > 0:
for _, row in future_df.iterrows():
forecast_rows.append(build_forecast_row(row, target_status))
current_row = forecast_rows[0] if forecast_rows else None
current = build_current_payload(current_row)
feature_importance = []
for target_name in MODEL_FILES:
feature_importance.extend(
extract_feature_importance_from_bundle(
load_model_bundle(target_name, cache_revision=registry_revision),
target_name,
)
)
feature_importance.sort(key=lambda item: item["importance"], reverse=True)
snapshot = {
"location": {
"name": "Aarhus",
"timezone": "Europe/Copenhagen",
},
"generatedAt": to_iso(now_cph()),
"targetLabels": build_target_labels(),
"explanations": build_explanations(),
"targetStatus": target_status,
"current": current,
"forecast": forecast_rows,
"history": history,
"verification": verification,
"leadBuckets": build_lead_bucket_rows(registry),
"featureImportance": feature_importance[:24],
"modelInfo": {
"trainedAt": model_meta.get("trained_at"),
"trainingSamples": model_meta.get("n_samples"),
"targets": model_meta.get("targets", list(MODEL_FILES.keys())),
"registryGeneratedAt": registry.get("generated_at"),
},
"alerts": build_alert_rows(forecast_rows),
}
return snapshot
def publish_frontend_snapshot():
"""Write and upload the frontend snapshot JSON to the predictions dataset."""
try:
snapshot = build_frontend_snapshot()
with open(FRONTEND_SNAPSHOT_FILE, "w", encoding="utf-8") as handle:
json.dump(snapshot, handle, indent=2, ensure_ascii=False)
success = upload_to_dataset(FRONTEND_SNAPSHOT_FILE, FRONTEND_SNAPSHOT_FILE, PREDICTIONS_DATASET)
if success:
return True, f"Uploaded {FRONTEND_SNAPSHOT_FILE}"
return False, f"Failed to upload {FRONTEND_SNAPSHOT_FILE}"
except Exception as exc:
log_exception("publish_frontend_snapshot", exc)
return False, str(exc)
# =============================================================================
# BACKFILL OPERATIONS
# =============================================================================
def backfill_historical_data():
"""Backfill historical data from the agreed historical start date to now."""
log_event("backfill_historical_data entered")
init_dataset_if_needed()
start_date = HISTORICAL_BACKFILL_START
end_date = now_cph().date()
print(f"๐Ÿ”„ Fetching from {start_date} to {end_date}")
all_data = []
current_month_start = start_date
while current_month_start <= end_date:
if current_month_start.month == 12:
next_month = datetime(current_month_start.year + 1, 1, 1).date()
else:
next_month = datetime(current_month_start.year, current_month_start.month + 1, 1).date()
month_end = min(next_month - timedelta(days=1), end_date)
print(f"๐Ÿ”„ Fetching {current_month_start.strftime('%Y-%m')}...")
forecasts = fetch_forecasts_for_period(current_month_start, month_end)
if forecasts is not None and len(forecasts) > 0:
min_target = forecasts['target_timestamp'].min().date()
max_target = forecasts['target_timestamp'].max().date()
observations = fetch_observations_for_period(
min_target - timedelta(days=2),
max_target + timedelta(days=2)
)
if observations is not None:
merged = build_training_matrix(forecasts, observations)
if merged is not None and len(merged) > 0:
all_data.append(merged)
print(f"โœ… {len(merged)} rows")
current_month_start = next_month
if not all_data:
return "โŒ No data collected"
final_df = pd.concat(all_data, ignore_index=True)
final_df = dedupe_rows(final_df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
# Save to parquet
final_df.to_parquet("training_matrix.parquet")
# Upload
if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME):
return f"โœ… Backfilled {len(final_df)} rows to training_matrix.parquet"
else:
return "โŒ Failed to upload"
# =============================================================================
# DAILY UPDATE
# =============================================================================
def update_daily():
"""Daily update - fetch last 7 days and append to dataset."""
log_event("update_daily entered")
init_dataset_if_needed()
end_date = now_cph().date()
start_date = end_date - timedelta(days=7)
print(f"โฐ Copenhagen time: {now_cph()}")
forecasts = fetch_forecasts_for_period(start_date, end_date)
if forecasts is None:
return "โŒ No forecasts fetched"
min_target = forecasts['target_timestamp'].min().date()
max_target = forecasts['target_timestamp'].max().date()
observations = fetch_observations_for_period(min_target - timedelta(days=2), max_target)
if observations is None:
return "โŒ No observations fetched"
merged = build_training_matrix(forecasts, observations)
if merged is None or len(merged) == 0:
return "โŒ No matching data"
# Try to load existing data
try:
existing, existing_name = load_existing_training_matrix()
if existing is not None and not existing.empty and "target_timestamp" in existing.columns:
new_data = find_new_rows(merged, existing, TRAINING_DEDUP_KEYS)
if len(new_data) == 0:
return f"โ„น๏ธ No new data ({existing_name} already up to date)"
combined = pd.concat([existing, new_data], ignore_index=True)
combined = dedupe_rows(combined, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
status_msg = f"โœ… Added {len(new_data)} new rows"
else:
combined = merged
status_msg = f"โœ… Created new dataset with {len(merged)} rows"
except Exception as e:
print(f"Info: {e}")
combined = merged
status_msg = f"โœ… Created dataset with {len(merged)} rows"
combined.to_parquet("training_matrix.parquet")
if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME):
snapshot_ok, snapshot_msg = publish_frontend_snapshot()
if snapshot_ok:
return f"{status_msg}\nSnapshot: {snapshot_msg}"
return f"{status_msg}\nSnapshot warning: {snapshot_msg}"
else:
return "โŒ Failed to upload"
# =============================================================================
# PREDICTIONS
# =============================================================================
def load_model_bundle(target_name, cache_revision=None):
"""Load model bundle from dataset, caching by registry revision."""
log_event("load_model_bundle", target_name=target_name, cache_revision=cache_revision)
with APP_STATE.lock:
if cache_revision and APP_STATE.cache_revision != cache_revision:
APP_STATE.model_bundle_cache = {}
APP_STATE.cache_revision = cache_revision
cached_bundle = APP_STATE.model_bundle_cache.get(target_name)
if cached_bundle is not None:
return cached_bundle
try:
path = hf_hub_download(
repo_id=DATASET_NAME,
filename=f"{target_name}_models.pkl",
repo_type="dataset",
token=HF_TOKEN
)
bundle = joblib.load(path)
with APP_STATE.lock:
APP_STATE.model_bundle_cache[target_name] = bundle
return bundle
except Exception as e:
log_exception(f"load_model_bundle[{target_name}]", e)
return None
def predict_with_bundle(bundle, df):
"""Make predictions using model bundle."""
if bundle is None or 'models' not in bundle:
return None
predictions = np.full(len(df), np.nan)
for bucket in df['lead_bucket'].unique():
if bucket in bundle['models']:
bucket_mask = df['lead_bucket'] == bucket
bucket_df = df[bucket_mask]
model_info = bundle['models'][bucket]
model = model_info['model']
feature_cols = model_info.get('feature_columns', [])
if feature_cols:
missing_cols = [col for col in feature_cols if col not in bucket_df.columns]
if missing_cols:
log_event("predict_with_bundle missing_features", bucket=bucket, missing_columns=missing_cols)
continue
X = bucket_df[feature_cols].fillna(0.0)
if hasattr(model, "predict_proba"):
bucket_pred = model.predict_proba(X)[:, 1]
else:
bucket_pred = model.predict(X)
predictions[bucket_mask] = bucket_pred
return predictions
# =============================================================================
# STARTUP CATCH-UP
# =============================================================================
def generate_predictions():
"""Generate predictions for all targets and preserve verified history."""
log_event("generate_predictions entered")
current_time = now_cph()
log_event("generate_predictions clock", current_time=str(current_time))
future_forecasts = fetch_future_forecasts()
if future_forecasts is None or len(future_forecasts) == 0:
return "Could not fetch future forecasts"
registry_revision = None
try:
registry_path = hf_hub_download(
repo_id=DATASET_NAME,
filename="model_registry.json",
repo_type="dataset",
token=HF_TOKEN,
)
with open(registry_path, "r") as handle:
registry = json.load(handle)
registry_revision = registry.get("generated_at")
if registry_revision:
clear_model_bundle_cache(registry_revision)
except Exception as exc:
log_exception("generate_predictions model_registry", exc)
feature_frame = build_live_feature_frame(future_forecasts)
results = feature_frame.copy()
results["prediction_made_at"] = current_time
results["city"] = "aarhus"
results["verified"] = False
results["actual_temp"] = None
results["actual_wind_speed"] = None
results["actual_wind_gust"] = None
results["actual_precipitation"] = None
results["actual_rain"] = None
results["actual_rain_event"] = None
results["actual_rain_amount"] = None
results["ml_temp"] = results["dmi_temperature_2m_pred"]
results["ml_wind_speed"] = results["dmi_windspeed_10m_pred"]
results["ml_wind_gust"] = results["dmi_windgusts_10m_pred"]
results["ml_rain_prob"] = results["dmi_precipitation_probability_pred"].fillna(0.0).clip(0.0, 100.0) / 100.0
results["ml_rain_amount"] = results["dmi_precipitation_pred"].fillna(0.0).clip(0.0, None)
target_specs = [
("temperature", "ml_temp", "dmi_temperature_2m_pred", True),
("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True),
("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True),
("rain_event", "ml_rain_prob", None, False),
("rain_amount", "ml_rain_amount", None, False),
]
for target_name, output_col, baseline_col, is_correction in target_specs:
bundle = load_model_bundle(target_name, cache_revision=registry_revision)
target_pred = predict_with_bundle(bundle, feature_frame)
if target_pred is None:
continue
target_series = pd.Series(target_pred, index=results.index, dtype="float64")
target_mask = target_series.notna()
if not target_mask.any():
continue
if is_correction:
results.loc[target_mask, output_col] = results.loc[target_mask, baseline_col] + target_series[target_mask]
else:
results.loc[target_mask, output_col] = target_series[target_mask]
results["ml_rain_prob"] = results["ml_rain_prob"].clip(0.0, 1.0)
results["ml_rain_amount"] = results["ml_rain_amount"].clip(0.0, None)
results = merge_prediction_history(load_predictions_snapshot(), results)
results.to_parquet("predictions_latest.parquet")
if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET):
future_count = int((results["target_timestamp"] > current_time).sum())
verified_count = int(results["verified"].fillna(False).astype(bool).sum())
status_msg = (
f"Generated/upserted {len(feature_frame)} future predictions. "
f"Dataset now holds {len(results)} rows, including {future_count} future rows "
f"and {verified_count} verified rows."
)
snapshot_ok, snapshot_msg = publish_frontend_snapshot()
if snapshot_ok:
return f"{status_msg}\nSnapshot: {snapshot_msg}"
return f"{status_msg}\nSnapshot warning: {snapshot_msg}"
return "Failed to upload predictions"
def verify_predictions():
"""Verify past predictions with actual observations."""
log_event("verify_predictions entered")
try:
pred_df = load_predictions_snapshot()
if pred_df is None or len(pred_df) == 0:
return "No predictions file found"
now = now_cph()
to_verify = pred_df[
(~pred_df["verified"]) &
(pred_df["target_timestamp"] < now - timedelta(hours=1))
]
if len(to_verify) == 0:
return "No predictions to verify"
start_date = to_verify["target_timestamp"].min().date()
end_date = to_verify["target_timestamp"].max().date()
observations = fetch_observations_for_period(start_date, end_date)
if observations is None or len(observations) == 0:
return "Could not fetch observations"
observation_cols = [
"actual_temp",
"actual_wind_speed",
"actual_wind_gust",
"actual_precipitation",
"actual_rain",
"rain_event",
"rain_amount",
]
lookup = observations.set_index("target_timestamp")[observation_cols]
verified_count = 0
for idx, row in to_verify.iterrows():
target_timestamp = row["target_timestamp"]
if target_timestamp not in lookup.index:
continue
match = lookup.loc[target_timestamp]
pred_df.loc[idx, "actual_temp"] = match["actual_temp"]
pred_df.loc[idx, "actual_wind_speed"] = match["actual_wind_speed"]
pred_df.loc[idx, "actual_wind_gust"] = match["actual_wind_gust"]
pred_df.loc[idx, "actual_precipitation"] = match["actual_precipitation"]
pred_df.loc[idx, "actual_rain"] = match["actual_rain"]
pred_df.loc[idx, "actual_rain_event"] = match["rain_event"]
pred_df.loc[idx, "actual_rain_amount"] = match["rain_amount"]
pred_df.loc[idx, "verified"] = True
verified_count += 1
pred_df = normalize_prediction_df(pred_df)
pred_df.to_parquet("predictions_latest.parquet")
if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET):
status_msg = f"Verified {verified_count} predictions"
snapshot_ok, snapshot_msg = publish_frontend_snapshot()
if snapshot_ok:
return f"{status_msg}\nSnapshot: {snapshot_msg}"
return f"{status_msg}\nSnapshot warning: {snapshot_msg}"
return "Failed to upload verified predictions"
except Exception as exc:
log_exception("verify_predictions", exc)
return f"Verification error: {exc}"
def load_predictions_snapshot():
pred_path, _ = load_first_available_dataset_file(
["predictions_latest.parquet", "predictions.parquet"],
PREDICTIONS_DATASET,
)
if not pred_path:
return None
pred_df = normalize_prediction_df(pd.read_parquet(pred_path))
if pred_df is None or 'target_timestamp' not in pred_df.columns:
return None
return pred_df
def build_collector_snapshot_text():
"""Summarize stored training and prediction data for the collector landing view."""
lines = []
try:
training_df, training_name = load_existing_training_matrix()
if training_df is None or len(training_df) == 0:
lines.append("Training data: no rows available.")
else:
latest_training = training_df["target_timestamp"].max()
lines.append(f"Training data ({training_name}): {len(training_df)} rows through {latest_training}.")
if "lead_bucket" in training_df.columns:
bucket_counts = training_df["lead_bucket"].value_counts().sort_index().to_dict()
bucket_text = ", ".join(f"{bucket}={count}" for bucket, count in bucket_counts.items())
lines.append(f"Lead buckets: {bucket_text}")
except Exception as exc:
lines.append(f"Training data summary unavailable: {exc}")
try:
pred_df = load_predictions_snapshot()
if pred_df is None or len(pred_df) == 0:
lines.append("Predictions: no rows available.")
else:
now = now_cph()
future_count = int((pred_df["target_timestamp"] > now).sum())
verified_count = int(pred_df["verified"].fillna(False).astype(bool).sum())
latest_prediction = pred_df["prediction_made_at"].max() if "prediction_made_at" in pred_df.columns else "unknown"
lines.append(f"Predictions: {len(pred_df)} rows, {future_count} future, {verified_count} verified.")
lines.append(f"Latest prediction made at: {latest_prediction}")
except Exception as exc:
lines.append(f"Prediction summary unavailable: {exc}")
return "\n".join(lines)
def build_collector_load_outputs():
return build_app_status_text(), build_collector_snapshot_text()
def build_action_status(result):
return f"{result}\n\n{build_collector_snapshot_text()}"
def should_run_daily_catch_up():
matrix_path = load_from_dataset("training_matrix.parquet", DATASET_NAME)
if not matrix_path:
matrix_path = load_from_dataset("data.parquet", DATASET_NAME)
if not matrix_path:
return True
df = pd.read_parquet(matrix_path)
timestamp_col = "target_timestamp" if "target_timestamp" in df.columns else "timestamp" if "timestamp" in df.columns else None
if timestamp_col is None:
return True
latest_timestamp = pd.to_datetime(df[timestamp_col], errors="coerce").max()
if pd.isna(latest_timestamp):
return True
return latest_timestamp.date() < now_cph().date()
def should_generate_predictions():
pred_df = load_predictions_snapshot()
if pred_df is None or pred_df.empty:
return True
future = pred_df[pred_df["target_timestamp"] > now_cph()]
if future.empty:
return True
return future["target_timestamp"].max() < now_cph() + timedelta(hours=6)
def should_verify_predictions():
pred_df = load_predictions_snapshot()
if pred_df is None or pred_df.empty:
return False
if 'verified' not in pred_df.columns:
pred_df['verified'] = False
overdue = pred_df[
(~pred_df['verified']) &
(pred_df['target_timestamp'] < now_cph() - timedelta(hours=1))
]
return len(overdue) > 0
def run_post_start_catch_up():
time.sleep(20)
with APP_STATE.lock:
if APP_STATE.catch_up_started:
return
APP_STATE.catch_up_started = True
APP_STATE.active_job = True
actions = []
try:
if should_run_daily_catch_up():
actions.append(run_logged("startup_catch_up_daily", update_daily))
if should_generate_predictions():
actions.append(run_logged("startup_catch_up_generate_predictions", generate_predictions))
if should_verify_predictions():
actions.append(run_logged("startup_catch_up_verify_predictions", verify_predictions))
log_event("startup_catch_up_completed", action_count=len(actions))
except Exception as exc:
note_app_error(exc)
log_exception("run_post_start_catch_up", exc)
finally:
with APP_STATE.lock:
APP_STATE.active_job = False
APP_STATE.warming = False
APP_STATE.ready = True
APP_STATE.catch_up_completed = True
# =============================================================================
# SCHEDULER
# =============================================================================
def run_scheduler():
"""Background scheduler for automated tasks."""
log_event("scheduler starting")
for run_time in ["00:35", "03:35", "06:35", "09:35", "12:35", "15:35", "18:35", "21:35"]:
schedule.every().day.at(run_time).do(lambda: run_logged("scheduled_generate_predictions", generate_predictions))
schedule.every().hour.at(":12").do(lambda: run_logged("scheduled_verify_predictions", verify_predictions))
schedule.every().day.at("05:45").do(lambda: run_logged("scheduled_update_daily", update_daily))
log_event("scheduler_registered")
while True:
try:
schedule.run_pending()
except Exception as exc:
log_exception("scheduler.run_pending", exc)
time.sleep(60)
# =============================================================================
# GRADIO UI
# =============================================================================
log_event("building_gradio_ui")
with gr.Blocks(title="DMI Aarhus Collector") as demo:
gr.Markdown("""
# ๐ŸŒค๏ธ DMI Aarhus Weather Collector
Collects forecast and observation data for Aarhus, generates predictions, and verifies accuracy.
""")
app_status = gr.Markdown(build_app_status_text())
status = gr.Textbox(label="Status", lines=10, value="Loading collector snapshot...")
with gr.Row():
btn_backfill = gr.Button("๐Ÿš€ Backfill Historical Data", variant="primary")
btn_daily = gr.Button("๐Ÿ”„ Daily Update", variant="secondary")
btn_predict = gr.Button("๐Ÿ”ฎ Generate Predictions", variant="primary")
btn_verify = gr.Button("โœ… Verify Predictions", variant="secondary")
def backfill_handler():
try:
result = run_logged("gradio_backfill_historical_data", backfill_historical_data)
set_app_ready()
return build_app_status_text(), build_action_status(result)
except Exception as exc:
note_app_error(exc)
return build_app_status_text(), f"โŒ backfill_historical_data failed: {exc}"
def daily_handler():
try:
result = run_logged("gradio_update_daily", update_daily)
set_app_ready()
return build_app_status_text(), build_action_status(result)
except Exception as exc:
note_app_error(exc)
return build_app_status_text(), f"โŒ update_daily failed: {exc}"
def predict_handler():
try:
result = run_logged("gradio_generate_predictions", generate_predictions)
set_app_ready()
return build_app_status_text(), build_action_status(result)
except Exception as exc:
note_app_error(exc)
return build_app_status_text(), f"โŒ generate_predictions failed: {exc}"
def verify_handler():
try:
result = run_logged("gradio_verify_predictions", verify_predictions)
set_app_ready()
return build_app_status_text(), build_action_status(result)
except Exception as exc:
note_app_error(exc)
return build_app_status_text(), f"โŒ verify_predictions failed: {exc}"
btn_backfill.click(backfill_handler, outputs=[app_status, status])
btn_daily.click(daily_handler, outputs=[app_status, status])
btn_predict.click(predict_handler, outputs=[app_status, status])
btn_verify.click(verify_handler, outputs=[app_status, status])
demo.load(build_collector_load_outputs, outputs=[app_status, status])
log_event("gradio_ui_ready")
log_event("ui_constructed")
if __name__ == "__main__":
log_startup()
try:
set_app_ready()
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True, name="collector-scheduler")
scheduler_thread.start()
catch_up_thread = threading.Thread(target=run_post_start_catch_up, daemon=True, name="collector-startup-catch-up")
catch_up_thread.start()
log_event("scheduler_thread_started", thread_name=scheduler_thread.name, is_alive=scheduler_thread.is_alive())
log_event("catch_up_thread_started", thread_name=catch_up_thread.name, is_alive=catch_up_thread.is_alive())
log_event("gradio_launch_called", server_name="0.0.0.0", server_port=7860, ssr_mode=False)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
ssr_mode=False
)
except Exception as exc:
log_exception("__main__", exc)
raise