import sqlite3 import json import os import uuid import logging from datetime import datetime from config import DB_PATH logger = logging.getLogger(__name__) def _get_conn(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): """Create tables if they don't exist.""" conn = _get_conn() c = conn.cursor() c.executescript(""" CREATE TABLE IF NOT EXISTS runs ( id TEXT PRIMARY KEY, benchmark TEXT NOT NULL, model TEXT NOT NULL, asset TEXT NOT NULL, start_date TEXT, end_date TEXT, status TEXT DEFAULT 'pending', created_at TEXT, completed_at TEXT, metrics TEXT, equity_curve TEXT, hodl_curve TEXT ); CREATE TABLE IF NOT EXISTS decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL, date TEXT, price REAL, action TEXT, size REAL, confidence REAL, reason TEXT, agent_outputs TEXT, portfolio_value REAL, FOREIGN KEY (run_id) REFERENCES runs(id) ); """) conn.commit() conn.close() logger.info(f"DB initialized at {DB_PATH}") def create_run(benchmark: str, model: str, asset: str, start_date: str, end_date: str) -> str: run_id = str(uuid.uuid4()) conn = _get_conn() conn.execute( "INSERT INTO runs (id, benchmark, model, asset, start_date, end_date, status, created_at) VALUES (?,?,?,?,?,?,?,?)", (run_id, benchmark, model, asset, start_date, end_date, "running", datetime.utcnow().isoformat()), ) conn.commit() conn.close() return run_id def complete_run(run_id: str, result: dict): metrics_json = json.dumps(result.get("metrics", {})) equity_json = json.dumps(result.get("equity_curve", [])) hodl_json = json.dumps(result.get("hodl_curve", [])) conn = _get_conn() conn.execute( "UPDATE runs SET status=?, completed_at=?, metrics=?, equity_curve=?, hodl_curve=? WHERE id=?", ("completed", datetime.utcnow().isoformat(), metrics_json, equity_json, hodl_json, run_id), ) for d in result.get("decisions", []): conn.execute( "INSERT INTO decisions (run_id, date, price, action, size, confidence, reason, agent_outputs, portfolio_value) VALUES (?,?,?,?,?,?,?,?,?)", ( run_id, d.get("date"), d.get("price"), d.get("action"), d.get("size"), d.get("confidence"), d.get("reason"), json.dumps(d.get("agent_outputs", {})), d.get("portfolio_value"), ), ) conn.commit() conn.close() def fail_run(run_id: str, error: str): conn = _get_conn() conn.execute( "UPDATE runs SET status=?, completed_at=?, metrics=? WHERE id=?", ("failed", datetime.utcnow().isoformat(), json.dumps({"error": error}), run_id), ) conn.commit() conn.close() def get_run(run_id: str) -> dict | None: conn = _get_conn() row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone() conn.close() if not row: return None return _row_to_run(row) def get_leaderboard() -> list: conn = _get_conn() rows = conn.execute( "SELECT * FROM runs WHERE status='completed' ORDER BY created_at DESC" ).fetchall() conn.close() return [_row_to_run(r) for r in rows] def get_decisions(run_id: str) -> list: conn = _get_conn() rows = conn.execute( "SELECT * FROM decisions WHERE run_id=? ORDER BY date ASC", (run_id,) ).fetchall() conn.close() result = [] for row in rows: d = dict(row) if d.get("agent_outputs"): try: d["agent_outputs"] = json.loads(d["agent_outputs"]) except Exception: pass result.append(d) return result def _row_to_run(row) -> dict: d = dict(row) for field in ("metrics", "equity_curve", "hodl_curve"): if d.get(field): try: d[field] = json.loads(d[field]) except Exception: d[field] = {} return d