Spaces:
Running
Running
| import sqlite3 | |
| import json | |
| import os | |
| import uuid | |
| import logging | |
| from datetime import datetime | |
| from config import DB_PATH | |
| logger = logging.getLogger(__name__) | |
| def _get_conn(): | |
| os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| """Create tables if they don't exist.""" | |
| conn = _get_conn() | |
| c = conn.cursor() | |
| c.executescript(""" | |
| CREATE TABLE IF NOT EXISTS runs ( | |
| id TEXT PRIMARY KEY, | |
| benchmark TEXT NOT NULL, | |
| model TEXT NOT NULL, | |
| asset TEXT NOT NULL, | |
| start_date TEXT, | |
| end_date TEXT, | |
| status TEXT DEFAULT 'pending', | |
| created_at TEXT, | |
| completed_at TEXT, | |
| metrics TEXT, | |
| equity_curve TEXT, | |
| hodl_curve TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS decisions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| run_id TEXT NOT NULL, | |
| date TEXT, | |
| price REAL, | |
| action TEXT, | |
| size REAL, | |
| confidence REAL, | |
| reason TEXT, | |
| agent_outputs TEXT, | |
| portfolio_value REAL, | |
| FOREIGN KEY (run_id) REFERENCES runs(id) | |
| ); | |
| """) | |
| conn.commit() | |
| conn.close() | |
| logger.info(f"DB initialized at {DB_PATH}") | |
| def create_run(benchmark: str, model: str, asset: str, start_date: str, end_date: str) -> str: | |
| run_id = str(uuid.uuid4()) | |
| conn = _get_conn() | |
| conn.execute( | |
| "INSERT INTO runs (id, benchmark, model, asset, start_date, end_date, status, created_at) VALUES (?,?,?,?,?,?,?,?)", | |
| (run_id, benchmark, model, asset, start_date, end_date, "running", datetime.utcnow().isoformat()), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return run_id | |
| def complete_run(run_id: str, result: dict): | |
| metrics_json = json.dumps(result.get("metrics", {})) | |
| equity_json = json.dumps(result.get("equity_curve", [])) | |
| hodl_json = json.dumps(result.get("hodl_curve", [])) | |
| conn = _get_conn() | |
| conn.execute( | |
| "UPDATE runs SET status=?, completed_at=?, metrics=?, equity_curve=?, hodl_curve=? WHERE id=?", | |
| ("completed", datetime.utcnow().isoformat(), metrics_json, equity_json, hodl_json, run_id), | |
| ) | |
| for d in result.get("decisions", []): | |
| conn.execute( | |
| "INSERT INTO decisions (run_id, date, price, action, size, confidence, reason, agent_outputs, portfolio_value) VALUES (?,?,?,?,?,?,?,?,?)", | |
| ( | |
| run_id, | |
| d.get("date"), | |
| d.get("price"), | |
| d.get("action"), | |
| d.get("size"), | |
| d.get("confidence"), | |
| d.get("reason"), | |
| json.dumps(d.get("agent_outputs", {})), | |
| d.get("portfolio_value"), | |
| ), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def fail_run(run_id: str, error: str): | |
| conn = _get_conn() | |
| conn.execute( | |
| "UPDATE runs SET status=?, completed_at=?, metrics=? WHERE id=?", | |
| ("failed", datetime.utcnow().isoformat(), json.dumps({"error": error}), run_id), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def get_run(run_id: str) -> dict | None: | |
| conn = _get_conn() | |
| row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| return _row_to_run(row) | |
| def get_leaderboard() -> list: | |
| conn = _get_conn() | |
| rows = conn.execute( | |
| "SELECT * FROM runs WHERE status='completed' ORDER BY created_at DESC" | |
| ).fetchall() | |
| conn.close() | |
| return [_row_to_run(r) for r in rows] | |
| def get_decisions(run_id: str) -> list: | |
| conn = _get_conn() | |
| rows = conn.execute( | |
| "SELECT * FROM decisions WHERE run_id=? ORDER BY date ASC", (run_id,) | |
| ).fetchall() | |
| conn.close() | |
| result = [] | |
| for row in rows: | |
| d = dict(row) | |
| if d.get("agent_outputs"): | |
| try: | |
| d["agent_outputs"] = json.loads(d["agent_outputs"]) | |
| except Exception: | |
| pass | |
| result.append(d) | |
| return result | |
| def _row_to_run(row) -> dict: | |
| d = dict(row) | |
| for field in ("metrics", "equity_curve", "hodl_curve"): | |
| if d.get(field): | |
| try: | |
| d[field] = json.loads(d[field]) | |
| except Exception: | |
| d[field] = {} | |
| return d | |