""" Daily live trading job — run once per day via cron. Idempotent: skips a day already processed. Usage: python live/daily_job.py """ import logging import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from datetime import datetime, date from data.prices import fetch_ohlcv from data.indicators import compute_indicators, get_latest_indicators from data.news import fetch_news from data.onchain import fetch_onchain_data from backtest.portfolio import Portfolio from agents.pipeline import build_pipeline from db.store import init_db, _get_conn from config import FREE_MODELS, ASSETS, BENCHMARKS logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) TODAY = date.today().isoformat() def _already_processed(benchmark: str, model: str, asset: str, day: str) -> bool: conn = _get_conn() row = conn.execute( "SELECT id FROM decisions WHERE run_id IN (SELECT id FROM runs WHERE benchmark=? AND model=? AND asset=? AND status='live') AND date=?", (benchmark, model, asset, day), ).fetchone() conn.close() return row is not None def _get_or_create_live_run(benchmark: str, model: str, asset: str) -> str: conn = _get_conn() row = conn.execute( "SELECT id FROM runs WHERE benchmark=? AND model=? AND asset=? AND status='live'", (benchmark, model, asset), ).fetchone() if row: run_id = row["id"] else: import uuid run_id = str(uuid.uuid4()) from datetime import datetime conn.execute( "INSERT INTO runs (id, benchmark, model, asset, status, created_at) VALUES (?,?,?,?,?,?)", (run_id, benchmark, model, asset, "live", datetime.utcnow().isoformat()), ) conn.commit() conn.close() return run_id def run_daily(): init_db() logger.info(f"Daily live job — {TODAY}") # Look back 60 days for indicators from datetime import datetime, timedelta lookback_start = (datetime.strptime(TODAY, "%Y-%m-%d") - timedelta(days=60)).strftime("%Y-%m-%d") for model in FREE_MODELS: for benchmark in BENCHMARKS: for asset in ASSETS: logger.info(f"Processing {benchmark}/{model}/{asset}") if _already_processed(benchmark, model, asset, TODAY): logger.info(f"Already processed {benchmark}/{model}/{asset}/{TODAY}, skipping") continue try: df_raw = fetch_ohlcv(asset, lookback_start, TODAY) df = compute_indicators(df_raw) if df.empty: logger.warning(f"No data for {asset}") continue row = df.iloc[-1] price = float(row["close"]) indicators = get_latest_indicators(df) run_id = _get_or_create_live_run(benchmark, model, asset) # Load portfolio state from DB conn = _get_conn() last_snap = conn.execute( "SELECT portfolio_value FROM decisions WHERE run_id=? ORDER BY date DESC LIMIT 1", (run_id,), ).fetchone() conn.close() from config import INITIAL_CAPITAL portfolio_value = last_snap["portfolio_value"] if last_snap else INITIAL_CAPITAL # Simplified snapshot (no persistent position tracking for now) portfolio_snapshot = { "cash": portfolio_value, "position": 0.0, "total_value": portfolio_value, "drawdown": 0.0, } from data.prices import ohlcv_to_records market_data = { "asset": asset, "current_price": price, "date": TODAY, "recent_ohlcv": ohlcv_to_records(df)[-30:], "indicators": indicators, "portfolio": portfolio_snapshot, } if benchmark in ("B", "C"): market_data["news"] = fetch_news(asset, limit=5) if benchmark == "C": market_data["onchain"] = fetch_onchain_data(asset) pipeline = build_pipeline(benchmark, model) result = pipeline.decide(market_data) decision = result["decision"] agent_outputs = result.get("agent_outputs", {}) import json conn = _get_conn() conn.execute( "INSERT INTO decisions (run_id, date, price, action, size, confidence, reason, agent_outputs, portfolio_value) VALUES (?,?,?,?,?,?,?,?,?)", ( run_id, TODAY, price, decision.get("action"), decision.get("size"), decision.get("confidence"), decision.get("reason"), json.dumps(agent_outputs), portfolio_value, ), ) conn.commit() conn.close() logger.info(f"Decision saved: {benchmark}/{model}/{asset}/{TODAY} -> {decision.get('action')}") except Exception as e: logger.error(f"Error for {benchmark}/{model}/{asset}: {e}", exc_info=True) if __name__ == "__main__": run_daily()