File size: 4,454 Bytes
68025ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import sqlite3
import json
import os
import uuid
import logging
from datetime import datetime
from config import DB_PATH

logger = logging.getLogger(__name__)


def _get_conn():
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    conn = sqlite3.connect(DB_PATH, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn


def init_db():
    """Create tables if they don't exist."""
    conn = _get_conn()
    c = conn.cursor()
    c.executescript("""
        CREATE TABLE IF NOT EXISTS runs (
            id TEXT PRIMARY KEY,
            benchmark TEXT NOT NULL,
            model TEXT NOT NULL,
            asset TEXT NOT NULL,
            start_date TEXT,
            end_date TEXT,
            status TEXT DEFAULT 'pending',
            created_at TEXT,
            completed_at TEXT,
            metrics TEXT,
            equity_curve TEXT,
            hodl_curve TEXT
        );

        CREATE TABLE IF NOT EXISTS decisions (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            run_id TEXT NOT NULL,
            date TEXT,
            price REAL,
            action TEXT,
            size REAL,
            confidence REAL,
            reason TEXT,
            agent_outputs TEXT,
            portfolio_value REAL,
            FOREIGN KEY (run_id) REFERENCES runs(id)
        );
    """)
    conn.commit()
    conn.close()
    logger.info(f"DB initialized at {DB_PATH}")


def create_run(benchmark: str, model: str, asset: str, start_date: str, end_date: str) -> str:
    run_id = str(uuid.uuid4())
    conn = _get_conn()
    conn.execute(
        "INSERT INTO runs (id, benchmark, model, asset, start_date, end_date, status, created_at) VALUES (?,?,?,?,?,?,?,?)",
        (run_id, benchmark, model, asset, start_date, end_date, "running", datetime.utcnow().isoformat()),
    )
    conn.commit()
    conn.close()
    return run_id


def complete_run(run_id: str, result: dict):
    metrics_json = json.dumps(result.get("metrics", {}))
    equity_json = json.dumps(result.get("equity_curve", []))
    hodl_json = json.dumps(result.get("hodl_curve", []))

    conn = _get_conn()
    conn.execute(
        "UPDATE runs SET status=?, completed_at=?, metrics=?, equity_curve=?, hodl_curve=? WHERE id=?",
        ("completed", datetime.utcnow().isoformat(), metrics_json, equity_json, hodl_json, run_id),
    )

    for d in result.get("decisions", []):
        conn.execute(
            "INSERT INTO decisions (run_id, date, price, action, size, confidence, reason, agent_outputs, portfolio_value) VALUES (?,?,?,?,?,?,?,?,?)",
            (
                run_id,
                d.get("date"),
                d.get("price"),
                d.get("action"),
                d.get("size"),
                d.get("confidence"),
                d.get("reason"),
                json.dumps(d.get("agent_outputs", {})),
                d.get("portfolio_value"),
            ),
        )

    conn.commit()
    conn.close()


def fail_run(run_id: str, error: str):
    conn = _get_conn()
    conn.execute(
        "UPDATE runs SET status=?, completed_at=?, metrics=? WHERE id=?",
        ("failed", datetime.utcnow().isoformat(), json.dumps({"error": error}), run_id),
    )
    conn.commit()
    conn.close()


def get_run(run_id: str) -> dict | None:
    conn = _get_conn()
    row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone()
    conn.close()
    if not row:
        return None
    return _row_to_run(row)


def get_leaderboard() -> list:
    conn = _get_conn()
    rows = conn.execute(
        "SELECT * FROM runs WHERE status='completed' ORDER BY created_at DESC"
    ).fetchall()
    conn.close()
    return [_row_to_run(r) for r in rows]


def get_decisions(run_id: str) -> list:
    conn = _get_conn()
    rows = conn.execute(
        "SELECT * FROM decisions WHERE run_id=? ORDER BY date ASC", (run_id,)
    ).fetchall()
    conn.close()
    result = []
    for row in rows:
        d = dict(row)
        if d.get("agent_outputs"):
            try:
                d["agent_outputs"] = json.loads(d["agent_outputs"])
            except Exception:
                pass
        result.append(d)
    return result


def _row_to_run(row) -> dict:
    d = dict(row)
    for field in ("metrics", "equity_curve", "hodl_curve"):
        if d.get(field):
            try:
                d[field] = json.loads(d[field])
            except Exception:
                d[field] = {}
    return d