Spaces:
Running
Running
File size: 4,454 Bytes
68025ee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | import sqlite3
import json
import os
import uuid
import logging
from datetime import datetime
from config import DB_PATH
logger = logging.getLogger(__name__)
def _get_conn():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Create tables if they don't exist."""
conn = _get_conn()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
benchmark TEXT NOT NULL,
model TEXT NOT NULL,
asset TEXT NOT NULL,
start_date TEXT,
end_date TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT,
completed_at TEXT,
metrics TEXT,
equity_curve TEXT,
hodl_curve TEXT
);
CREATE TABLE IF NOT EXISTS decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
date TEXT,
price REAL,
action TEXT,
size REAL,
confidence REAL,
reason TEXT,
agent_outputs TEXT,
portfolio_value REAL,
FOREIGN KEY (run_id) REFERENCES runs(id)
);
""")
conn.commit()
conn.close()
logger.info(f"DB initialized at {DB_PATH}")
def create_run(benchmark: str, model: str, asset: str, start_date: str, end_date: str) -> str:
run_id = str(uuid.uuid4())
conn = _get_conn()
conn.execute(
"INSERT INTO runs (id, benchmark, model, asset, start_date, end_date, status, created_at) VALUES (?,?,?,?,?,?,?,?)",
(run_id, benchmark, model, asset, start_date, end_date, "running", datetime.utcnow().isoformat()),
)
conn.commit()
conn.close()
return run_id
def complete_run(run_id: str, result: dict):
metrics_json = json.dumps(result.get("metrics", {}))
equity_json = json.dumps(result.get("equity_curve", []))
hodl_json = json.dumps(result.get("hodl_curve", []))
conn = _get_conn()
conn.execute(
"UPDATE runs SET status=?, completed_at=?, metrics=?, equity_curve=?, hodl_curve=? WHERE id=?",
("completed", datetime.utcnow().isoformat(), metrics_json, equity_json, hodl_json, run_id),
)
for d in result.get("decisions", []):
conn.execute(
"INSERT INTO decisions (run_id, date, price, action, size, confidence, reason, agent_outputs, portfolio_value) VALUES (?,?,?,?,?,?,?,?,?)",
(
run_id,
d.get("date"),
d.get("price"),
d.get("action"),
d.get("size"),
d.get("confidence"),
d.get("reason"),
json.dumps(d.get("agent_outputs", {})),
d.get("portfolio_value"),
),
)
conn.commit()
conn.close()
def fail_run(run_id: str, error: str):
conn = _get_conn()
conn.execute(
"UPDATE runs SET status=?, completed_at=?, metrics=? WHERE id=?",
("failed", datetime.utcnow().isoformat(), json.dumps({"error": error}), run_id),
)
conn.commit()
conn.close()
def get_run(run_id: str) -> dict | None:
conn = _get_conn()
row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone()
conn.close()
if not row:
return None
return _row_to_run(row)
def get_leaderboard() -> list:
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM runs WHERE status='completed' ORDER BY created_at DESC"
).fetchall()
conn.close()
return [_row_to_run(r) for r in rows]
def get_decisions(run_id: str) -> list:
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM decisions WHERE run_id=? ORDER BY date ASC", (run_id,)
).fetchall()
conn.close()
result = []
for row in rows:
d = dict(row)
if d.get("agent_outputs"):
try:
d["agent_outputs"] = json.loads(d["agent_outputs"])
except Exception:
pass
result.append(d)
return result
def _row_to_run(row) -> dict:
d = dict(row)
for field in ("metrics", "equity_curve", "hodl_curve"):
if d.get(field):
try:
d[field] = json.loads(d[field])
except Exception:
d[field] = {}
return d
|