Ciroc0 commited on
Commit
f3496a6
·
1 Parent(s): 863d586

Change for frontend

Browse files
Files changed (1) hide show
  1. app.py +99 -11
app.py CHANGED
@@ -30,6 +30,9 @@ FRONTEND_SNAPSHOT_FILE = "frontend_snapshot.json"
30
 
31
  COPENHAGEN_TZ = ZoneInfo("Europe/Copenhagen")
32
  APP_NAME = "dmi-collector"
 
 
 
33
 
34
  faulthandler.enable()
35
 
@@ -296,7 +299,7 @@ def normalize_prediction_df(pred_df):
296
  pred_df = dedupe_rows(
297
  pred_df,
298
  PREDICTION_DEDUP_KEYS,
299
- sort_keys=["target_timestamp", "prediction_made_at", "reference_time"],
300
  keep="last",
301
  )
302
  return pred_df
@@ -309,8 +312,27 @@ def merge_prediction_history(existing_df, new_df):
309
  if new_df is None or len(new_df) == 0:
310
  return normalize_prediction_df(existing_df)
311
 
312
- combined = pd.concat([existing_df, new_df], ignore_index=True)
313
- return normalize_prediction_df(combined)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
 
315
 
316
  # =============================================================================
@@ -815,7 +837,7 @@ def build_recent_backtest(training_df):
815
 
816
  current_time = now_cph()
817
  history = training_df[
818
- (training_df["target_timestamp"] >= current_time - timedelta(days=7))
819
  & (training_df["target_timestamp"] <= current_time)
820
  ].copy()
821
  if len(history) == 0:
@@ -823,7 +845,7 @@ def build_recent_backtest(training_df):
823
 
824
  if "lead_time_hours" in history.columns:
825
  history = history[
826
- history["lead_time_hours"].fillna(0).between(0.0001, 48, inclusive="both")
827
  ].copy()
828
  if len(history) == 0:
829
  return None
@@ -888,6 +910,68 @@ def build_recent_backtest(training_df):
888
  return history
889
 
890
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
891
  def calculate_verification_metrics(predictions_df=None, backtest_df=None):
892
  """Compute frontend-facing verification summary."""
893
  source_df = None
@@ -1314,6 +1398,10 @@ def build_frontend_snapshot():
1314
  training_df, _ = load_existing_training_matrix()
1315
  registry = load_json_from_dataset("model_registry.json", DATASET_NAME) or {}
1316
  model_meta = load_json_from_dataset("model_meta.json", DATASET_NAME) or {}
 
 
 
 
1317
  target_status = build_target_status(registry)
1318
 
1319
  backtest_df = build_recent_backtest(training_df)
@@ -1334,9 +1422,6 @@ def build_frontend_snapshot():
1334
  current = build_current_payload(current_row)
1335
 
1336
  feature_importance = []
1337
- registry_revision = registry.get("generated_at")
1338
- if registry_revision:
1339
- clear_model_bundle_cache(registry_revision)
1340
  for target_name in MODEL_FILES:
1341
  feature_importance.extend(
1342
  extract_feature_importance_from_bundle(
@@ -1391,11 +1476,11 @@ def publish_frontend_snapshot():
1391
  # BACKFILL OPERATIONS
1392
  # =============================================================================
1393
  def backfill_historical_data():
1394
- """Backfill historical data from 2025-11-01 to now."""
1395
  log_event("backfill_historical_data entered")
1396
  init_dataset_if_needed()
1397
 
1398
- start_date = datetime(2025, 11, 1).date()
1399
  end_date = now_cph().date()
1400
 
1401
  print(f"🔄 Fetching from {start_date} to {end_date}")
@@ -1559,7 +1644,10 @@ def predict_with_bundle(bundle, df):
1559
  log_event("predict_with_bundle missing_features", bucket=bucket, missing_columns=missing_cols)
1560
  continue
1561
  X = bucket_df[feature_cols].fillna(0.0)
1562
- bucket_pred = model.predict(X)
 
 
 
1563
  predictions[bucket_mask] = bucket_pred
1564
 
1565
  return predictions
 
30
 
31
  COPENHAGEN_TZ = ZoneInfo("Europe/Copenhagen")
32
  APP_NAME = "dmi-collector"
33
+ HISTORICAL_BACKFILL_START = datetime(2025, 11, 1).date()
34
+ TRAINING_HOLDOUT_DAYS = 7
35
+ FUTURE_FORECAST_HOURS = 48
36
 
37
  faulthandler.enable()
38
 
 
299
  pred_df = dedupe_rows(
300
  pred_df,
301
  PREDICTION_DEDUP_KEYS,
302
+ sort_keys=["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"],
303
  keep="last",
304
  )
305
  return pred_df
 
312
  if new_df is None or len(new_df) == 0:
313
  return normalize_prediction_df(existing_df)
314
 
315
+ existing = normalize_prediction_df(existing_df).copy()
316
+ incoming = normalize_prediction_df(new_df).copy()
317
+
318
+ existing["_merge_priority"] = 0
319
+ incoming["_merge_priority"] = 1
320
+ combined = pd.concat([existing, incoming], ignore_index=True, sort=False)
321
+ combined = normalize_prediction_df(combined)
322
+
323
+ sort_keys = [
324
+ key
325
+ for key in ["target_timestamp", "_merge_priority", "prediction_made_at", "reference_time"]
326
+ if key in combined.columns
327
+ ]
328
+ if sort_keys:
329
+ combined = combined.sort_values(sort_keys).reset_index(drop=True)
330
+ if "target_timestamp" in combined.columns:
331
+ combined = combined.drop_duplicates(subset=["target_timestamp"], keep="last").reset_index(drop=True)
332
+
333
+ if "_merge_priority" in combined.columns:
334
+ combined = combined.drop(columns=["_merge_priority"])
335
+ return combined
336
 
337
 
338
  # =============================================================================
 
837
 
838
  current_time = now_cph()
839
  history = training_df[
840
+ (training_df["target_timestamp"] >= current_time - timedelta(days=TRAINING_HOLDOUT_DAYS))
841
  & (training_df["target_timestamp"] <= current_time)
842
  ].copy()
843
  if len(history) == 0:
 
845
 
846
  if "lead_time_hours" in history.columns:
847
  history = history[
848
+ history["lead_time_hours"].fillna(0).between(0.0001, FUTURE_FORECAST_HOURS, inclusive="both")
849
  ].copy()
850
  if len(history) == 0:
851
  return None
 
910
  return history
911
 
912
 
913
+ def rebuild_future_ml_columns(predictions_df, registry_revision=None):
914
+ """Recompute live ML columns from stored forecast features before publishing the frontend snapshot."""
915
+ if predictions_df is None or len(predictions_df) == 0 or "target_timestamp" not in predictions_df.columns:
916
+ return predictions_df
917
+
918
+ current_time = now_cph()
919
+ repaired = predictions_df.copy()
920
+ future_mask = repaired["target_timestamp"] > current_time
921
+ if not future_mask.any():
922
+ return repaired
923
+
924
+ future_df = repaired.loc[future_mask].copy()
925
+ future_df["ml_temp"] = future_df.get("ml_temp", future_df.get("dmi_temperature_2m_pred"))
926
+ future_df["ml_wind_speed"] = future_df.get("ml_wind_speed", future_df.get("dmi_windspeed_10m_pred"))
927
+ future_df["ml_wind_gust"] = future_df.get("ml_wind_gust", future_df.get("dmi_windgusts_10m_pred"))
928
+ future_df["ml_rain_prob"] = future_df.get(
929
+ "ml_rain_prob",
930
+ future_df.get("dmi_precipitation_probability_pred", pd.Series(0.0, index=future_df.index))
931
+ .fillna(0.0)
932
+ .clip(0.0, 100.0)
933
+ / 100.0,
934
+ )
935
+ future_df["ml_rain_amount"] = future_df.get(
936
+ "ml_rain_amount",
937
+ future_df.get("dmi_precipitation_pred", pd.Series(0.0, index=future_df.index)).fillna(0.0).clip(0.0, None),
938
+ )
939
+
940
+ target_specs = [
941
+ ("temperature", "ml_temp", "dmi_temperature_2m_pred", True),
942
+ ("wind_speed", "ml_wind_speed", "dmi_windspeed_10m_pred", True),
943
+ ("wind_gust", "ml_wind_gust", "dmi_windgusts_10m_pred", True),
944
+ ("rain_event", "ml_rain_prob", None, False),
945
+ ("rain_amount", "ml_rain_amount", None, False),
946
+ ]
947
+
948
+ for target_name, output_col, baseline_col, is_correction in target_specs:
949
+ bundle = load_model_bundle(target_name, cache_revision=registry_revision)
950
+ target_pred = predict_with_bundle(bundle, future_df)
951
+ if target_pred is None:
952
+ continue
953
+
954
+ target_series = pd.Series(target_pred, index=future_df.index, dtype="float64")
955
+ target_mask = target_series.notna()
956
+ if not target_mask.any():
957
+ continue
958
+
959
+ if is_correction:
960
+ future_df.loc[target_mask, output_col] = future_df.loc[target_mask, baseline_col] + target_series[target_mask]
961
+ elif target_name == "rain_event":
962
+ future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, 1.0)
963
+ else:
964
+ future_df.loc[target_mask, output_col] = target_series[target_mask].clip(0.0, None)
965
+
966
+ future_df["ml_rain_prob"] = future_df["ml_rain_prob"].fillna(0.0).clip(0.0, 1.0)
967
+ future_df["ml_rain_amount"] = future_df["ml_rain_amount"].fillna(0.0).clip(0.0, None)
968
+ for column_name in future_df.columns:
969
+ if column_name not in repaired.columns:
970
+ repaired[column_name] = np.nan
971
+ repaired.loc[future_mask, future_df.columns] = future_df
972
+ return repaired
973
+
974
+
975
  def calculate_verification_metrics(predictions_df=None, backtest_df=None):
976
  """Compute frontend-facing verification summary."""
977
  source_df = None
 
1398
  training_df, _ = load_existing_training_matrix()
1399
  registry = load_json_from_dataset("model_registry.json", DATASET_NAME) or {}
1400
  model_meta = load_json_from_dataset("model_meta.json", DATASET_NAME) or {}
1401
+ registry_revision = registry.get("generated_at")
1402
+ if registry_revision:
1403
+ clear_model_bundle_cache(registry_revision)
1404
+ predictions_df = rebuild_future_ml_columns(predictions_df, registry_revision=registry_revision)
1405
  target_status = build_target_status(registry)
1406
 
1407
  backtest_df = build_recent_backtest(training_df)
 
1422
  current = build_current_payload(current_row)
1423
 
1424
  feature_importance = []
 
 
 
1425
  for target_name in MODEL_FILES:
1426
  feature_importance.extend(
1427
  extract_feature_importance_from_bundle(
 
1476
  # BACKFILL OPERATIONS
1477
  # =============================================================================
1478
  def backfill_historical_data():
1479
+ """Backfill historical data from the agreed historical start date to now."""
1480
  log_event("backfill_historical_data entered")
1481
  init_dataset_if_needed()
1482
 
1483
+ start_date = HISTORICAL_BACKFILL_START
1484
  end_date = now_cph().date()
1485
 
1486
  print(f"🔄 Fetching from {start_date} to {end_date}")
 
1644
  log_event("predict_with_bundle missing_features", bucket=bucket, missing_columns=missing_cols)
1645
  continue
1646
  X = bucket_df[feature_cols].fillna(0.0)
1647
+ if hasattr(model, "predict_proba"):
1648
+ bucket_pred = model.predict_proba(X)[:, 1]
1649
+ else:
1650
+ bucket_pred = model.predict(X)
1651
  predictions[bucket_mask] = bucket_pred
1652
 
1653
  return predictions