Ciroc0 commited on
Commit
97db48a
·
1 Parent(s): a66d87c

Fix collector data flow and space metadata

Browse files
Files changed (2) hide show
  1. README.md +3 -0
  2. app.py +340 -84
README.md CHANGED
@@ -1,5 +1,8 @@
1
  ---
2
  title: DMI Aarhus Collector
 
 
 
3
  sdk: gradio
4
  sdk_version: 5.47.2
5
  python_version: "3.10"
 
1
  ---
2
  title: DMI Aarhus Collector
3
+ emoji: "🌤️"
4
+ colorFrom: blue
5
+ colorTo: green
6
  sdk: gradio
7
  sdk_version: 5.47.2
8
  python_version: "3.10"
app.py CHANGED
@@ -68,6 +68,9 @@ OBSERVATION_FEATURES = [
68
  "windgusts_10m",
69
  ]
70
 
 
 
 
71
 
72
  class LazyModule:
73
  def __init__(self, module_name):
@@ -213,6 +216,94 @@ def get_lead_bucket(lead_hours):
213
  return "25-48"
214
 
215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
  # =============================================================================
217
  # FORECAST FETCHING
218
  # =============================================================================
@@ -295,8 +386,7 @@ def fetch_forecasts_for_period(start_date, end_date):
295
  return None
296
 
297
  df = pd.DataFrame(all_forecasts)
298
- df = df.drop_duplicates(subset=['target_timestamp'], keep='first')
299
- df = df.sort_values('target_timestamp').reset_index(drop=True)
300
  log_event("fetch_forecasts_for_period done", rows=len(df))
301
  return df
302
 
@@ -527,14 +617,7 @@ def build_training_matrix(forecasts_df, observations_df):
527
  log_event("build_training_matrix no_matches")
528
  return None
529
 
530
- # Add temporal features
531
- merged = add_temporal_features(merged)
532
-
533
- # Add run delta features
534
- merged = add_run_delta_features(merged)
535
-
536
- # Add observation lags (requires sorting by time)
537
- merged = add_observation_lags(merged)
538
 
539
  # Add correction targets for temperature and wind
540
  merged['temp_correction_target'] = merged['actual_temp'] - merged['dmi_temperature_2m_pred']
@@ -544,6 +627,7 @@ def build_training_matrix(forecasts_df, observations_df):
544
  # Filter out future times
545
  current_hour = now_cph().replace(minute=0, second=0, microsecond=0)
546
  merged = merged[merged['target_timestamp'] <= current_hour]
 
547
 
548
  log_event("build_training_matrix done", rows=len(merged), columns=len(merged.columns))
549
  return merged
@@ -607,7 +691,7 @@ def init_dataset_if_needed():
607
  print(f"✅ Training dataset already available as {existing_name}")
608
  return existing_path, existing_name
609
 
610
- empty_df = pd.DataFrame(columns=["target_timestamp"])
611
  empty_df.to_parquet("training_matrix.parquet")
612
 
613
  if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME):
@@ -634,14 +718,9 @@ def load_existing_training_matrix():
634
  if existing.empty or "target_timestamp" not in existing.columns:
635
  return existing, existing_name
636
 
637
- if existing["target_timestamp"].dt.tz is None:
638
- existing["target_timestamp"] = existing["target_timestamp"].dt.tz_localize(
639
- "Europe/Copenhagen",
640
- ambiguous="infer",
641
- )
642
- else:
643
- existing["target_timestamp"] = existing["target_timestamp"].dt.tz_convert("Europe/Copenhagen")
644
-
645
  return existing, existing_name
646
 
647
 
@@ -695,8 +774,7 @@ def backfill_historical_data():
695
  return "❌ No data collected"
696
 
697
  final_df = pd.concat(all_data, ignore_index=True)
698
- final_df = final_df.drop_duplicates(subset=['target_timestamp'], keep='first')
699
- final_df = final_df.sort_values('target_timestamp').reset_index(drop=True)
700
 
701
  # Save to parquet
702
  final_df.to_parquet("training_matrix.parquet")
@@ -740,15 +818,13 @@ def update_daily():
740
  try:
741
  existing, existing_name = load_existing_training_matrix()
742
  if existing is not None and not existing.empty and "target_timestamp" in existing.columns:
743
- existing_ts = set(existing["target_timestamp"])
744
- mask = ~merged["target_timestamp"].isin(existing_ts)
745
- new_data = merged[mask]
746
 
747
  if len(new_data) == 0:
748
  return f"ℹ️ No new data ({existing_name} already up to date)"
749
 
750
  combined = pd.concat([existing, new_data], ignore_index=True)
751
- combined = combined.drop_duplicates(subset=["target_timestamp"], keep="first")
752
  status_msg = f"✅ Added {len(new_data)} new rows"
753
  else:
754
  combined = merged
@@ -799,24 +875,27 @@ def predict_with_bundle(bundle, df):
799
  """Make predictions using model bundle."""
800
  if bundle is None or 'models' not in bundle:
801
  return None
802
-
803
- results = df.copy()
804
- predictions = np.zeros(len(df))
805
-
806
  for bucket in df['lead_bucket'].unique():
807
  if bucket in bundle['models']:
808
  bucket_mask = df['lead_bucket'] == bucket
809
  bucket_df = df[bucket_mask]
810
-
811
  model_info = bundle['models'][bucket]
812
  model = model_info['model']
813
  feature_cols = model_info.get('feature_columns', [])
814
-
815
  if feature_cols:
 
 
 
 
816
  X = bucket_df[feature_cols].fillna(0.0)
817
  bucket_pred = model.predict(X)
818
  predictions[bucket_mask] = bucket_pred
819
-
820
  return predictions
821
 
822
 
@@ -848,45 +927,60 @@ def generate_predictions():
848
  log_exception("generate_predictions model_registry", e)
849
  registry = None
850
 
851
- results = future_forecasts.copy()
 
852
  results['prediction_made_at'] = current_time
853
  results['city'] = 'aarhus'
854
-
 
 
 
 
 
855
  # Temperature prediction
856
  temp_bundle = load_model_bundle('temperature', cache_revision=registry_revision)
857
  if temp_bundle:
858
- temp_pred = predict_with_bundle(temp_bundle, future_forecasts)
859
  if temp_pred is not None:
860
- # Correction model: ml_pred = dmi_pred + correction
861
- results['ml_temp'] = results['dmi_temperature_2m_pred'] + temp_pred
 
862
 
863
  # Wind speed prediction
864
  wind_bundle = load_model_bundle('wind_speed', cache_revision=registry_revision)
865
  if wind_bundle:
866
- wind_pred = predict_with_bundle(wind_bundle, future_forecasts)
867
  if wind_pred is not None:
868
- results['ml_wind_speed'] = results['dmi_windspeed_10m_pred'] + wind_pred
 
 
869
 
870
  # Wind gust prediction
871
  gust_bundle = load_model_bundle('wind_gust', cache_revision=registry_revision)
872
  if gust_bundle:
873
- gust_pred = predict_with_bundle(gust_bundle, future_forecasts)
874
  if gust_pred is not None:
875
- results['ml_wind_gust'] = results['dmi_windgusts_10m_pred'] + gust_pred
 
 
876
 
877
  # Rain event prediction
878
  rain_event_bundle = load_model_bundle('rain_event', cache_revision=registry_revision)
879
  if rain_event_bundle:
880
- rain_event_pred = predict_with_bundle(rain_event_bundle, future_forecasts)
881
  if rain_event_pred is not None:
882
- results['ml_rain_prob'] = rain_event_pred
 
 
883
 
884
  # Rain amount prediction
885
  rain_amount_bundle = load_model_bundle('rain_amount', cache_revision=registry_revision)
886
  if rain_amount_bundle:
887
- rain_amount_pred = predict_with_bundle(rain_amount_bundle, future_forecasts)
888
  if rain_amount_pred is not None:
889
- results['ml_rain_amount'] = np.clip(rain_amount_pred, 0, None)
 
 
890
 
891
  # Add verification fields
892
  results['verified'] = False
@@ -894,6 +988,12 @@ def generate_predictions():
894
  results['actual_wind_speed'] = None
895
  results['actual_wind_gust'] = None
896
  results['actual_precipitation'] = None
 
 
 
 
 
 
897
 
898
  # Save to parquet
899
  results.to_parquet("predictions_latest.parquet")
@@ -971,6 +1071,150 @@ def verify_predictions():
971
  # =============================================================================
972
  # STARTUP CATCH-UP
973
  # =============================================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
974
  def load_predictions_snapshot():
975
  pred_path, _ = load_first_available_dataset_file(
976
  ["predictions_latest.parquet", "predictions.parquet"],
@@ -979,18 +1223,55 @@ def load_predictions_snapshot():
979
  if not pred_path:
980
  return None
981
 
982
- pred_df = pd.read_parquet(pred_path)
983
- if 'timestamp' in pred_df.columns and 'target_timestamp' not in pred_df.columns:
984
- pred_df = pred_df.rename(columns={'timestamp': 'target_timestamp'})
985
- if 'target_timestamp' not in pred_df.columns:
986
  return None
987
- if pred_df['target_timestamp'].dt.tz is None:
988
- pred_df['target_timestamp'] = pred_df['target_timestamp'].dt.tz_localize('Europe/Copenhagen', ambiguous='infer')
989
- else:
990
- pred_df['target_timestamp'] = pred_df['target_timestamp'].dt.tz_convert('Europe/Copenhagen')
991
  return pred_df
992
 
993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
994
  def should_run_daily_catch_up():
995
  matrix_path = load_from_dataset("training_matrix.parquet", DATASET_NAME)
996
  if not matrix_path:
@@ -1065,33 +1346,8 @@ def run_post_start_catch_up():
1065
  def run_scheduler():
1066
  """Background scheduler for automated tasks."""
1067
  log_event("scheduler starting")
1068
- # Every 3 hours: fetch new forecast run
1069
- schedule.every(3).hours.do(
1070
- lambda: run_logged(
1071
- "scheduled_fetch_forecasts",
1072
- fetch_forecasts_for_period,
1073
- now_cph().date() - timedelta(days=1),
1074
- now_cph().date(),
1075
- )
1076
- )
1077
-
1078
- # Every hour: fetch new observations
1079
- schedule.every().hour.do(
1080
- lambda: run_logged(
1081
- "scheduled_fetch_observations",
1082
- fetch_observations_for_period,
1083
- now_cph().date() - timedelta(days=1),
1084
- now_cph().date(),
1085
- )
1086
- )
1087
-
1088
- # Every 3 hours: generate new predictions
1089
  schedule.every(3).hours.do(lambda: run_logged("scheduled_generate_predictions", generate_predictions))
1090
-
1091
- # Every hour: verify past predictions
1092
  schedule.every().hour.do(lambda: run_logged("scheduled_verify_predictions", verify_predictions))
1093
-
1094
- # Daily: rebuild training matrix
1095
  schedule.every().day.at("06:00").do(lambda: run_logged("scheduled_update_daily", update_daily))
1096
  log_event("scheduler_registered")
1097
 
@@ -1115,7 +1371,7 @@ with gr.Blocks(title="DMI Aarhus Collector") as demo:
1115
  """)
1116
 
1117
  app_status = gr.Markdown(build_app_status_text())
1118
- status = gr.Textbox(label="Status", lines=10)
1119
 
1120
  with gr.Row():
1121
  btn_backfill = gr.Button("🚀 Backfill Historical Data", variant="primary")
@@ -1127,7 +1383,7 @@ with gr.Blocks(title="DMI Aarhus Collector") as demo:
1127
  try:
1128
  result = run_logged("gradio_backfill_historical_data", backfill_historical_data)
1129
  set_app_ready()
1130
- return build_app_status_text(), result
1131
  except Exception as exc:
1132
  note_app_error(exc)
1133
  return build_app_status_text(), f"❌ backfill_historical_data failed: {exc}"
@@ -1136,7 +1392,7 @@ with gr.Blocks(title="DMI Aarhus Collector") as demo:
1136
  try:
1137
  result = run_logged("gradio_update_daily", update_daily)
1138
  set_app_ready()
1139
- return build_app_status_text(), result
1140
  except Exception as exc:
1141
  note_app_error(exc)
1142
  return build_app_status_text(), f"❌ update_daily failed: {exc}"
@@ -1145,7 +1401,7 @@ with gr.Blocks(title="DMI Aarhus Collector") as demo:
1145
  try:
1146
  result = run_logged("gradio_generate_predictions", generate_predictions)
1147
  set_app_ready()
1148
- return build_app_status_text(), result
1149
  except Exception as exc:
1150
  note_app_error(exc)
1151
  return build_app_status_text(), f"❌ generate_predictions failed: {exc}"
@@ -1154,7 +1410,7 @@ with gr.Blocks(title="DMI Aarhus Collector") as demo:
1154
  try:
1155
  result = run_logged("gradio_verify_predictions", verify_predictions)
1156
  set_app_ready()
1157
- return build_app_status_text(), result
1158
  except Exception as exc:
1159
  note_app_error(exc)
1160
  return build_app_status_text(), f"❌ verify_predictions failed: {exc}"
@@ -1163,7 +1419,7 @@ with gr.Blocks(title="DMI Aarhus Collector") as demo:
1163
  btn_daily.click(daily_handler, outputs=[app_status, status])
1164
  btn_predict.click(predict_handler, outputs=[app_status, status])
1165
  btn_verify.click(verify_handler, outputs=[app_status, status])
1166
- demo.load(lambda: build_app_status_text(), outputs=app_status)
1167
 
1168
  log_event("gradio_ui_ready")
1169
  log_event("ui_constructed")
 
68
  "windgusts_10m",
69
  ]
70
 
71
+ TRAINING_DEDUP_KEYS = ["reference_time", "target_timestamp"]
72
+ PREDICTION_DEDUP_KEYS = ["target_timestamp"]
73
+
74
 
75
  class LazyModule:
76
  def __init__(self, module_name):
 
216
  return "25-48"
217
 
218
 
219
+ def ensure_copenhagen_time(df, column_name):
220
+ """Ensure a datetime column is timezone-aware in Europe/Copenhagen."""
221
+ if column_name not in df.columns:
222
+ return df
223
+ series = pd.to_datetime(df[column_name], errors="coerce")
224
+ if getattr(series.dt, "tz", None) is None:
225
+ df[column_name] = series.dt.tz_localize(COPENHAGEN_TZ, ambiguous="infer", nonexistent="shift_forward")
226
+ else:
227
+ df[column_name] = series.dt.tz_convert(COPENHAGEN_TZ)
228
+ return df
229
+
230
+
231
+ def dedupe_rows(df, dedup_keys, sort_keys=None, keep="last"):
232
+ """Sort and drop duplicate rows using the keys available on the dataframe."""
233
+ if df is None or len(df) == 0:
234
+ return df
235
+
236
+ available_sort_keys = [key for key in (sort_keys or dedup_keys) if key in df.columns]
237
+ if available_sort_keys:
238
+ df = df.sort_values(available_sort_keys).reset_index(drop=True)
239
+
240
+ available_dedup_keys = [key for key in dedup_keys if key in df.columns]
241
+ if available_dedup_keys:
242
+ df = df.drop_duplicates(subset=available_dedup_keys, keep=keep).reset_index(drop=True)
243
+ return df
244
+
245
+
246
+ def find_new_rows(candidate_df, existing_df, dedup_keys):
247
+ """Return rows that are new compared with an existing dataframe."""
248
+ if existing_df is None or len(existing_df) == 0:
249
+ return candidate_df
250
+
251
+ keys = [key for key in dedup_keys if key in candidate_df.columns and key in existing_df.columns]
252
+ if not keys:
253
+ return candidate_df
254
+
255
+ existing_keys = existing_df[keys].drop_duplicates().copy()
256
+ existing_keys["_existing"] = True
257
+ merged = candidate_df.merge(existing_keys, on=keys, how="left")
258
+ merged = merged[merged["_existing"] != True].drop(columns=["_existing"])
259
+ return merged.reset_index(drop=True)
260
+
261
+
262
+ def build_model_features(df):
263
+ """Build the causal feature set used by both training and live inference."""
264
+ if df is None or len(df) == 0:
265
+ return df
266
+ features_df = df.copy()
267
+ features_df = add_temporal_features(features_df)
268
+ features_df = add_run_delta_features(features_df)
269
+ return features_df
270
+
271
+
272
+ def normalize_prediction_df(pred_df):
273
+ """Normalize prediction history to the current schema."""
274
+ if pred_df is None or len(pred_df) == 0:
275
+ return pred_df
276
+
277
+ if "timestamp" in pred_df.columns and "target_timestamp" not in pred_df.columns:
278
+ pred_df = pred_df.rename(columns={"timestamp": "target_timestamp"})
279
+
280
+ for column_name in ["target_timestamp", "reference_time", "prediction_made_at"]:
281
+ pred_df = ensure_copenhagen_time(pred_df, column_name)
282
+
283
+ if "verified" not in pred_df.columns:
284
+ pred_df["verified"] = False
285
+ pred_df["verified"] = pred_df["verified"].fillna(False).astype(bool)
286
+
287
+ pred_df = dedupe_rows(
288
+ pred_df,
289
+ PREDICTION_DEDUP_KEYS,
290
+ sort_keys=["target_timestamp", "prediction_made_at", "reference_time"],
291
+ keep="last",
292
+ )
293
+ return pred_df
294
+
295
+
296
+ def merge_prediction_history(existing_df, new_df):
297
+ """Upsert future rows while preserving historical prediction rows."""
298
+ if existing_df is None or len(existing_df) == 0:
299
+ return normalize_prediction_df(new_df)
300
+ if new_df is None or len(new_df) == 0:
301
+ return normalize_prediction_df(existing_df)
302
+
303
+ combined = pd.concat([existing_df, new_df], ignore_index=True)
304
+ return normalize_prediction_df(combined)
305
+
306
+
307
  # =============================================================================
308
  # FORECAST FETCHING
309
  # =============================================================================
 
386
  return None
387
 
388
  df = pd.DataFrame(all_forecasts)
389
+ df = dedupe_rows(df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
 
390
  log_event("fetch_forecasts_for_period done", rows=len(df))
391
  return df
392
 
 
617
  log_event("build_training_matrix no_matches")
618
  return None
619
 
620
+ merged = build_model_features(merged)
 
 
 
 
 
 
 
621
 
622
  # Add correction targets for temperature and wind
623
  merged['temp_correction_target'] = merged['actual_temp'] - merged['dmi_temperature_2m_pred']
 
627
  # Filter out future times
628
  current_hour = now_cph().replace(minute=0, second=0, microsecond=0)
629
  merged = merged[merged['target_timestamp'] <= current_hour]
630
+ merged = dedupe_rows(merged, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
631
 
632
  log_event("build_training_matrix done", rows=len(merged), columns=len(merged.columns))
633
  return merged
 
691
  print(f"✅ Training dataset already available as {existing_name}")
692
  return existing_path, existing_name
693
 
694
+ empty_df = pd.DataFrame(columns=TRAINING_DEDUP_KEYS)
695
  empty_df.to_parquet("training_matrix.parquet")
696
 
697
  if upload_to_dataset("training_matrix.parquet", "training_matrix.parquet", DATASET_NAME):
 
718
  if existing.empty or "target_timestamp" not in existing.columns:
719
  return existing, existing_name
720
 
721
+ existing = ensure_copenhagen_time(existing, "target_timestamp")
722
+ existing = ensure_copenhagen_time(existing, "reference_time")
723
+ existing = dedupe_rows(existing, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
 
 
 
 
 
724
  return existing, existing_name
725
 
726
 
 
774
  return "❌ No data collected"
775
 
776
  final_df = pd.concat(all_data, ignore_index=True)
777
+ final_df = dedupe_rows(final_df, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
 
778
 
779
  # Save to parquet
780
  final_df.to_parquet("training_matrix.parquet")
 
818
  try:
819
  existing, existing_name = load_existing_training_matrix()
820
  if existing is not None and not existing.empty and "target_timestamp" in existing.columns:
821
+ new_data = find_new_rows(merged, existing, TRAINING_DEDUP_KEYS)
 
 
822
 
823
  if len(new_data) == 0:
824
  return f"ℹ️ No new data ({existing_name} already up to date)"
825
 
826
  combined = pd.concat([existing, new_data], ignore_index=True)
827
+ combined = dedupe_rows(combined, TRAINING_DEDUP_KEYS, sort_keys=["target_timestamp", "reference_time"], keep="last")
828
  status_msg = f"✅ Added {len(new_data)} new rows"
829
  else:
830
  combined = merged
 
875
  """Make predictions using model bundle."""
876
  if bundle is None or 'models' not in bundle:
877
  return None
878
+
879
+ predictions = np.full(len(df), np.nan)
880
+
 
881
  for bucket in df['lead_bucket'].unique():
882
  if bucket in bundle['models']:
883
  bucket_mask = df['lead_bucket'] == bucket
884
  bucket_df = df[bucket_mask]
885
+
886
  model_info = bundle['models'][bucket]
887
  model = model_info['model']
888
  feature_cols = model_info.get('feature_columns', [])
889
+
890
  if feature_cols:
891
+ missing_cols = [col for col in feature_cols if col not in bucket_df.columns]
892
+ if missing_cols:
893
+ log_event("predict_with_bundle missing_features", bucket=bucket, missing_columns=missing_cols)
894
+ continue
895
  X = bucket_df[feature_cols].fillna(0.0)
896
  bucket_pred = model.predict(X)
897
  predictions[bucket_mask] = bucket_pred
898
+
899
  return predictions
900
 
901
 
 
927
  log_exception("generate_predictions model_registry", e)
928
  registry = None
929
 
930
+ feature_frame = build_model_features(future_forecasts)
931
+ results = feature_frame.copy()
932
  results['prediction_made_at'] = current_time
933
  results['city'] = 'aarhus'
934
+ results['verified'] = False
935
+ results['ml_temp'] = results['dmi_temperature_2m_pred']
936
+ results['ml_wind_speed'] = results['dmi_windspeed_10m_pred']
937
+ results['ml_wind_gust'] = results['dmi_windgusts_10m_pred']
938
+ results['ml_rain_prob'] = results['dmi_precipitation_probability_pred'].fillna(0.0).clip(0.0, 100.0) / 100.0
939
+ results['ml_rain_amount'] = results['dmi_precipitation_pred'].fillna(0.0).clip(0.0, None)
940
  # Temperature prediction
941
  temp_bundle = load_model_bundle('temperature', cache_revision=registry_revision)
942
  if temp_bundle:
943
+ temp_pred = predict_with_bundle(temp_bundle, feature_frame)
944
  if temp_pred is not None:
945
+ temp_series = pd.Series(temp_pred, index=results.index, dtype="float64")
946
+ temp_mask = temp_series.notna()
947
+ results.loc[temp_mask, 'ml_temp'] = results.loc[temp_mask, 'dmi_temperature_2m_pred'] + temp_series[temp_mask]
948
 
949
  # Wind speed prediction
950
  wind_bundle = load_model_bundle('wind_speed', cache_revision=registry_revision)
951
  if wind_bundle:
952
+ wind_pred = predict_with_bundle(wind_bundle, feature_frame)
953
  if wind_pred is not None:
954
+ wind_series = pd.Series(wind_pred, index=results.index, dtype="float64")
955
+ wind_mask = wind_series.notna()
956
+ results.loc[wind_mask, 'ml_wind_speed'] = results.loc[wind_mask, 'dmi_windspeed_10m_pred'] + wind_series[wind_mask]
957
 
958
  # Wind gust prediction
959
  gust_bundle = load_model_bundle('wind_gust', cache_revision=registry_revision)
960
  if gust_bundle:
961
+ gust_pred = predict_with_bundle(gust_bundle, feature_frame)
962
  if gust_pred is not None:
963
+ gust_series = pd.Series(gust_pred, index=results.index, dtype="float64")
964
+ gust_mask = gust_series.notna()
965
+ results.loc[gust_mask, 'ml_wind_gust'] = results.loc[gust_mask, 'dmi_windgusts_10m_pred'] + gust_series[gust_mask]
966
 
967
  # Rain event prediction
968
  rain_event_bundle = load_model_bundle('rain_event', cache_revision=registry_revision)
969
  if rain_event_bundle:
970
+ rain_event_pred = predict_with_bundle(rain_event_bundle, feature_frame)
971
  if rain_event_pred is not None:
972
+ rain_event_series = pd.Series(rain_event_pred, index=results.index, dtype="float64")
973
+ rain_event_mask = rain_event_series.notna()
974
+ results.loc[rain_event_mask, 'ml_rain_prob'] = rain_event_series[rain_event_mask]
975
 
976
  # Rain amount prediction
977
  rain_amount_bundle = load_model_bundle('rain_amount', cache_revision=registry_revision)
978
  if rain_amount_bundle:
979
+ rain_amount_pred = predict_with_bundle(rain_amount_bundle, feature_frame)
980
  if rain_amount_pred is not None:
981
+ rain_amount_series = pd.Series(rain_amount_pred, index=results.index, dtype="float64")
982
+ rain_amount_mask = rain_amount_series.notna()
983
+ results.loc[rain_amount_mask, 'ml_rain_amount'] = np.clip(rain_amount_series[rain_amount_mask], 0, None)
984
 
985
  # Add verification fields
986
  results['verified'] = False
 
988
  results['actual_wind_speed'] = None
989
  results['actual_wind_gust'] = None
990
  results['actual_precipitation'] = None
991
+ results['actual_rain'] = None
992
+ results['actual_rain_event'] = None
993
+ results['actual_rain_amount'] = None
994
+ results['ml_rain_prob'] = results['ml_rain_prob'].clip(0.0, 1.0)
995
+ results['ml_rain_amount'] = results['ml_rain_amount'].clip(0.0, None)
996
+ results = merge_prediction_history(load_predictions_snapshot(), results)
997
 
998
  # Save to parquet
999
  results.to_parquet("predictions_latest.parquet")
 
1071
  # =============================================================================
1072
  # STARTUP CATCH-UP
1073
  # =============================================================================
1074
+ def generate_predictions():
1075
+ """Generate predictions for all targets and preserve verified history."""
1076
+ log_event("generate_predictions entered")
1077
+ current_time = now_cph()
1078
+ log_event("generate_predictions clock", current_time=str(current_time))
1079
+
1080
+ future_forecasts = fetch_future_forecasts()
1081
+ if future_forecasts is None or len(future_forecasts) == 0:
1082
+ return "Could not fetch future forecasts"
1083
+
1084
+ registry_revision = None
1085
+ try:
1086
+ registry_path = hf_hub_download(
1087
+ repo_id=DATASET_NAME,
1088
+ filename="model_registry.json",
1089
+ repo_type="dataset",
1090
+ token=HF_TOKEN,
1091
+ )
1092
+ with open(registry_path, "r") as handle:
1093
+ registry = json.load(handle)
1094
+ registry_revision = registry.get("generated_at")
1095
+ if registry_revision:
1096
+ clear_model_bundle_cache(registry_revision)
1097
+ except Exception as exc:
1098
+ log_exception("generate_predictions model_registry", exc)
1099
+
1100
+ feature_frame = build_model_features(future_forecasts)
1101
+ results = feature_frame.copy()
1102
+ results["prediction_made_at"] = current_time
1103
+ results["city"] = "aarhus"
1104
+ results["verified"] = False
1105
+ results["actual_temp"] = None
1106
+ results["actual_wind_speed"] = None
1107
+ results["actual_wind_gust"] = None
1108
+ results["actual_precipitation"] = None
1109
+ results["actual_rain"] = None
1110
+ results["actual_rain_event"] = None
1111
+ results["actual_rain_amount"] = None
1112
+ results["ml_temp"] = results["dmi_temperature_2m_pred"]
1113
+ results["ml_wind_speed"] = results["dmi_windspeed_10m_pred"]
1114
+ results["ml_wind_gust"] = results["dmi_windgusts_10m_pred"]
1115
+ results["ml_rain_prob"] = results["dmi_precipitation_probability_pred"].fillna(0.0).clip(0.0, 100.0) / 100.0
1116
+ results["ml_rain_amount"] = results["dmi_precipitation_pred"].fillna(0.0).clip(0.0, None)
1117
+
1118
+ target_specs = [
1119
+ ("temperature", "ml_temp", "dmi_temperature_2m_pred", True),
1120
+ ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True),
1121
+ ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True),
1122
+ ("rain_event", "ml_rain_prob", None, False),
1123
+ ("rain_amount", "ml_rain_amount", None, False),
1124
+ ]
1125
+
1126
+ for target_name, output_col, baseline_col, is_correction in target_specs:
1127
+ bundle = load_model_bundle(target_name, cache_revision=registry_revision)
1128
+ target_pred = predict_with_bundle(bundle, feature_frame)
1129
+ if target_pred is None:
1130
+ continue
1131
+
1132
+ target_series = pd.Series(target_pred, index=results.index, dtype="float64")
1133
+ target_mask = target_series.notna()
1134
+ if not target_mask.any():
1135
+ continue
1136
+
1137
+ if is_correction:
1138
+ results.loc[target_mask, output_col] = results.loc[target_mask, baseline_col] + target_series[target_mask]
1139
+ else:
1140
+ results.loc[target_mask, output_col] = target_series[target_mask]
1141
+
1142
+ results["ml_rain_prob"] = results["ml_rain_prob"].clip(0.0, 1.0)
1143
+ results["ml_rain_amount"] = results["ml_rain_amount"].clip(0.0, None)
1144
+
1145
+ results = merge_prediction_history(load_predictions_snapshot(), results)
1146
+ results.to_parquet("predictions_latest.parquet")
1147
+
1148
+ if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET):
1149
+ future_count = int((results["target_timestamp"] > current_time).sum())
1150
+ verified_count = int(results["verified"].fillna(False).astype(bool).sum())
1151
+ return (
1152
+ f"Generated/upserted {len(feature_frame)} future predictions. "
1153
+ f"Dataset now holds {len(results)} rows, including {future_count} future rows "
1154
+ f"and {verified_count} verified rows."
1155
+ )
1156
+ return "Failed to upload predictions"
1157
+
1158
+
1159
+ def verify_predictions():
1160
+ """Verify past predictions with actual observations."""
1161
+ log_event("verify_predictions entered")
1162
+ try:
1163
+ pred_df = load_predictions_snapshot()
1164
+ if pred_df is None or len(pred_df) == 0:
1165
+ return "No predictions file found"
1166
+
1167
+ now = now_cph()
1168
+ to_verify = pred_df[
1169
+ (~pred_df["verified"]) &
1170
+ (pred_df["target_timestamp"] < now - timedelta(hours=1))
1171
+ ]
1172
+ if len(to_verify) == 0:
1173
+ return "No predictions to verify"
1174
+
1175
+ start_date = to_verify["target_timestamp"].min().date()
1176
+ end_date = to_verify["target_timestamp"].max().date()
1177
+ observations = fetch_observations_for_period(start_date, end_date)
1178
+ if observations is None or len(observations) == 0:
1179
+ return "Could not fetch observations"
1180
+
1181
+ observation_cols = [
1182
+ "actual_temp",
1183
+ "actual_wind_speed",
1184
+ "actual_wind_gust",
1185
+ "actual_precipitation",
1186
+ "actual_rain",
1187
+ "rain_event",
1188
+ "rain_amount",
1189
+ ]
1190
+ lookup = observations.set_index("target_timestamp")[observation_cols]
1191
+
1192
+ verified_count = 0
1193
+ for idx, row in to_verify.iterrows():
1194
+ target_timestamp = row["target_timestamp"]
1195
+ if target_timestamp not in lookup.index:
1196
+ continue
1197
+ match = lookup.loc[target_timestamp]
1198
+ pred_df.loc[idx, "actual_temp"] = match["actual_temp"]
1199
+ pred_df.loc[idx, "actual_wind_speed"] = match["actual_wind_speed"]
1200
+ pred_df.loc[idx, "actual_wind_gust"] = match["actual_wind_gust"]
1201
+ pred_df.loc[idx, "actual_precipitation"] = match["actual_precipitation"]
1202
+ pred_df.loc[idx, "actual_rain"] = match["actual_rain"]
1203
+ pred_df.loc[idx, "actual_rain_event"] = match["rain_event"]
1204
+ pred_df.loc[idx, "actual_rain_amount"] = match["rain_amount"]
1205
+ pred_df.loc[idx, "verified"] = True
1206
+ verified_count += 1
1207
+
1208
+ pred_df = normalize_prediction_df(pred_df)
1209
+ pred_df.to_parquet("predictions_latest.parquet")
1210
+ if upload_to_dataset("predictions_latest.parquet", "predictions_latest.parquet", PREDICTIONS_DATASET):
1211
+ return f"Verified {verified_count} predictions"
1212
+ return "Failed to upload verified predictions"
1213
+ except Exception as exc:
1214
+ log_exception("verify_predictions", exc)
1215
+ return f"Verification error: {exc}"
1216
+
1217
+
1218
  def load_predictions_snapshot():
1219
  pred_path, _ = load_first_available_dataset_file(
1220
  ["predictions_latest.parquet", "predictions.parquet"],
 
1223
  if not pred_path:
1224
  return None
1225
 
1226
+ pred_df = normalize_prediction_df(pd.read_parquet(pred_path))
1227
+ if pred_df is None or 'target_timestamp' not in pred_df.columns:
 
 
1228
  return None
 
 
 
 
1229
  return pred_df
1230
 
1231
 
1232
+ def build_collector_snapshot_text():
1233
+ """Summarize stored training and prediction data for the collector landing view."""
1234
+ lines = []
1235
+
1236
+ try:
1237
+ training_df, training_name = load_existing_training_matrix()
1238
+ if training_df is None or len(training_df) == 0:
1239
+ lines.append("Training data: no rows available.")
1240
+ else:
1241
+ latest_training = training_df["target_timestamp"].max()
1242
+ lines.append(f"Training data ({training_name}): {len(training_df)} rows through {latest_training}.")
1243
+ if "lead_bucket" in training_df.columns:
1244
+ bucket_counts = training_df["lead_bucket"].value_counts().sort_index().to_dict()
1245
+ bucket_text = ", ".join(f"{bucket}={count}" for bucket, count in bucket_counts.items())
1246
+ lines.append(f"Lead buckets: {bucket_text}")
1247
+ except Exception as exc:
1248
+ lines.append(f"Training data summary unavailable: {exc}")
1249
+
1250
+ try:
1251
+ pred_df = load_predictions_snapshot()
1252
+ if pred_df is None or len(pred_df) == 0:
1253
+ lines.append("Predictions: no rows available.")
1254
+ else:
1255
+ now = now_cph()
1256
+ future_count = int((pred_df["target_timestamp"] > now).sum())
1257
+ verified_count = int(pred_df["verified"].fillna(False).astype(bool).sum())
1258
+ latest_prediction = pred_df["prediction_made_at"].max() if "prediction_made_at" in pred_df.columns else "unknown"
1259
+ lines.append(f"Predictions: {len(pred_df)} rows, {future_count} future, {verified_count} verified.")
1260
+ lines.append(f"Latest prediction made at: {latest_prediction}")
1261
+ except Exception as exc:
1262
+ lines.append(f"Prediction summary unavailable: {exc}")
1263
+
1264
+ return "\n".join(lines)
1265
+
1266
+
1267
+ def build_collector_load_outputs():
1268
+ return build_app_status_text(), build_collector_snapshot_text()
1269
+
1270
+
1271
+ def build_action_status(result):
1272
+ return f"{result}\n\n{build_collector_snapshot_text()}"
1273
+
1274
+
1275
  def should_run_daily_catch_up():
1276
  matrix_path = load_from_dataset("training_matrix.parquet", DATASET_NAME)
1277
  if not matrix_path:
 
1346
  def run_scheduler():
1347
  """Background scheduler for automated tasks."""
1348
  log_event("scheduler starting")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1349
  schedule.every(3).hours.do(lambda: run_logged("scheduled_generate_predictions", generate_predictions))
 
 
1350
  schedule.every().hour.do(lambda: run_logged("scheduled_verify_predictions", verify_predictions))
 
 
1351
  schedule.every().day.at("06:00").do(lambda: run_logged("scheduled_update_daily", update_daily))
1352
  log_event("scheduler_registered")
1353
 
 
1371
  """)
1372
 
1373
  app_status = gr.Markdown(build_app_status_text())
1374
+ status = gr.Textbox(label="Status", lines=10, value="Loading collector snapshot...")
1375
 
1376
  with gr.Row():
1377
  btn_backfill = gr.Button("🚀 Backfill Historical Data", variant="primary")
 
1383
  try:
1384
  result = run_logged("gradio_backfill_historical_data", backfill_historical_data)
1385
  set_app_ready()
1386
+ return build_app_status_text(), build_action_status(result)
1387
  except Exception as exc:
1388
  note_app_error(exc)
1389
  return build_app_status_text(), f"❌ backfill_historical_data failed: {exc}"
 
1392
  try:
1393
  result = run_logged("gradio_update_daily", update_daily)
1394
  set_app_ready()
1395
+ return build_app_status_text(), build_action_status(result)
1396
  except Exception as exc:
1397
  note_app_error(exc)
1398
  return build_app_status_text(), f"❌ update_daily failed: {exc}"
 
1401
  try:
1402
  result = run_logged("gradio_generate_predictions", generate_predictions)
1403
  set_app_ready()
1404
+ return build_app_status_text(), build_action_status(result)
1405
  except Exception as exc:
1406
  note_app_error(exc)
1407
  return build_app_status_text(), f"❌ generate_predictions failed: {exc}"
 
1410
  try:
1411
  result = run_logged("gradio_verify_predictions", verify_predictions)
1412
  set_app_ready()
1413
+ return build_app_status_text(), build_action_status(result)
1414
  except Exception as exc:
1415
  note_app_error(exc)
1416
  return build_app_status_text(), f"❌ verify_predictions failed: {exc}"
 
1419
  btn_daily.click(daily_handler, outputs=[app_status, status])
1420
  btn_predict.click(predict_handler, outputs=[app_status, status])
1421
  btn_verify.click(verify_handler, outputs=[app_status, status])
1422
+ demo.load(build_collector_load_outputs, outputs=[app_status, status])
1423
 
1424
  log_event("gradio_ui_ready")
1425
  log_event("ui_constructed")