Mehdi
feat: backend
68025ee
Raw
History Blame Contribute Delete
4.45 kB
import sqlite3
import json
import os
import uuid
import logging
from datetime import datetime
from config import DB_PATH
logger = logging.getLogger(__name__)
def _get_conn():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Create tables if they don't exist."""
conn = _get_conn()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
benchmark TEXT NOT NULL,
model TEXT NOT NULL,
asset TEXT NOT NULL,
start_date TEXT,
end_date TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT,
completed_at TEXT,
metrics TEXT,
equity_curve TEXT,
hodl_curve TEXT
);
CREATE TABLE IF NOT EXISTS decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
date TEXT,
price REAL,
action TEXT,
size REAL,
confidence REAL,
reason TEXT,
agent_outputs TEXT,
portfolio_value REAL,
FOREIGN KEY (run_id) REFERENCES runs(id)
);
""")
conn.commit()
conn.close()
logger.info(f"DB initialized at {DB_PATH}")
def create_run(benchmark: str, model: str, asset: str, start_date: str, end_date: str) -> str:
run_id = str(uuid.uuid4())
conn = _get_conn()
conn.execute(
"INSERT INTO runs (id, benchmark, model, asset, start_date, end_date, status, created_at) VALUES (?,?,?,?,?,?,?,?)",
(run_id, benchmark, model, asset, start_date, end_date, "running", datetime.utcnow().isoformat()),
)
conn.commit()
conn.close()
return run_id
def complete_run(run_id: str, result: dict):
metrics_json = json.dumps(result.get("metrics", {}))
equity_json = json.dumps(result.get("equity_curve", []))
hodl_json = json.dumps(result.get("hodl_curve", []))
conn = _get_conn()
conn.execute(
"UPDATE runs SET status=?, completed_at=?, metrics=?, equity_curve=?, hodl_curve=? WHERE id=?",
("completed", datetime.utcnow().isoformat(), metrics_json, equity_json, hodl_json, run_id),
)
for d in result.get("decisions", []):
conn.execute(
"INSERT INTO decisions (run_id, date, price, action, size, confidence, reason, agent_outputs, portfolio_value) VALUES (?,?,?,?,?,?,?,?,?)",
(
run_id,
d.get("date"),
d.get("price"),
d.get("action"),
d.get("size"),
d.get("confidence"),
d.get("reason"),
json.dumps(d.get("agent_outputs", {})),
d.get("portfolio_value"),
),
)
conn.commit()
conn.close()
def fail_run(run_id: str, error: str):
conn = _get_conn()
conn.execute(
"UPDATE runs SET status=?, completed_at=?, metrics=? WHERE id=?",
("failed", datetime.utcnow().isoformat(), json.dumps({"error": error}), run_id),
)
conn.commit()
conn.close()
def get_run(run_id: str) -> dict | None:
conn = _get_conn()
row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone()
conn.close()
if not row:
return None
return _row_to_run(row)
def get_leaderboard() -> list:
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM runs WHERE status='completed' ORDER BY created_at DESC"
).fetchall()
conn.close()
return [_row_to_run(r) for r in rows]
def get_decisions(run_id: str) -> list:
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM decisions WHERE run_id=? ORDER BY date ASC", (run_id,)
).fetchall()
conn.close()
result = []
for row in rows:
d = dict(row)
if d.get("agent_outputs"):
try:
d["agent_outputs"] = json.loads(d["agent_outputs"])
except Exception:
pass
result.append(d)
return result
def _row_to_run(row) -> dict:
d = dict(row)
for field in ("metrics", "equity_curve", "hodl_curve"):
if d.get(field):
try:
d[field] = json.loads(d[field])
except Exception:
d[field] = {}
return d