Spaces:
Running
Running
| import logging | |
| from datetime import datetime | |
| from data.prices import fetch_ohlcv, ohlcv_to_records | |
| from data.indicators import compute_indicators, get_latest_indicators | |
| from data.news import fetch_news | |
| from data.onchain import fetch_onchain_data | |
| from backtest.portfolio import Portfolio, compute_metrics | |
| from agents.pipeline import build_pipeline | |
| logger = logging.getLogger(__name__) | |
| def run_backtest( | |
| benchmark: str, | |
| model: str, | |
| asset: str, | |
| start_date: str, | |
| end_date: str, | |
| progress_callback=None, | |
| ) -> dict: | |
| """ | |
| Full backtest loop. Returns dict with metrics, equity_curve, decisions, hodl_curve. | |
| """ | |
| logger.info(f"Starting backtest: benchmark={benchmark} model={model} asset={asset} {start_date}->{end_date}") | |
| # Fetch and prepare price data | |
| df_raw = fetch_ohlcv(asset, start_date, end_date) | |
| df = compute_indicators(df_raw) | |
| if df.empty or len(df) < 2: | |
| raise ValueError(f"Insufficient data for {asset} from {start_date} to {end_date}") | |
| ohlcv_records = ohlcv_to_records(df) | |
| # Portfolio for agent strategy | |
| portfolio = Portfolio() | |
| # HODL portfolio (buy on day 1, hold) | |
| hodl_portfolio = Portfolio() | |
| first_price = float(df.iloc[0]["close"]) | |
| hodl_portfolio.apply_decision({"action": "BUY", "size": 1.0}, first_price, str(df.iloc[0]["date"])) | |
| decisions_log = [] | |
| pipeline = build_pipeline(benchmark, model) | |
| total_days = len(df) | |
| for i, row in df.iterrows(): | |
| date = str(row["date"]) | |
| price = float(row["close"]) | |
| # Build market_data context (data available up to this day) | |
| recent_records = ohlcv_records[: i + 1] | |
| indicators = get_latest_indicators(df.iloc[: i + 1]) | |
| portfolio_snapshot = portfolio.snapshot(price) | |
| market_data = { | |
| "asset": asset, | |
| "current_price": price, | |
| "date": date, | |
| "recent_ohlcv": recent_records[-30:], # last 30 days | |
| "indicators": indicators, | |
| "portfolio": portfolio_snapshot, | |
| } | |
| # Add news/onchain for benchmarks B and C (only in live-style; skip for backtest speed) | |
| if benchmark in ("B", "C"): | |
| try: | |
| news = fetch_news(asset, date=date, limit=5) | |
| market_data["news"] = news | |
| except Exception as e: | |
| logger.warning(f"News fetch failed for {date}: {e}") | |
| market_data["news"] = [] | |
| if benchmark == "C": | |
| try: | |
| onchain = fetch_onchain_data(asset) | |
| market_data["onchain"] = onchain | |
| except Exception as e: | |
| logger.warning(f"On-chain fetch failed for {date}: {e}") | |
| market_data["onchain"] = {} | |
| # Get decision from pipeline | |
| try: | |
| result = pipeline.decide(market_data) | |
| decision = result["decision"] | |
| agent_outputs = result.get("agent_outputs", {}) | |
| except Exception as e: | |
| logger.error(f"Pipeline error on {date}: {e}") | |
| decision = {"action": "HOLD", "size": 0.0, "confidence": 0.0, "reason": f"Error: {e}"} | |
| agent_outputs = {} | |
| # Apply to portfolio | |
| portfolio.apply_decision(decision, price, date) | |
| # Update HODL | |
| hodl_portfolio.equity_history.append({ | |
| "date": date, | |
| "value": round(hodl_portfolio.cash + hodl_portfolio.position * price, 2), | |
| "price": price, | |
| "action": "HOLD", | |
| "trade_executed": False, | |
| "trade_value": 0.0, | |
| "cash": hodl_portfolio.cash, | |
| "position": hodl_portfolio.position, | |
| }) | |
| decisions_log.append({ | |
| "date": date, | |
| "price": price, | |
| "action": decision.get("action"), | |
| "size": decision.get("size"), | |
| "confidence": decision.get("confidence"), | |
| "reason": decision.get("reason"), | |
| "agent_outputs": agent_outputs, | |
| "portfolio_value": portfolio_snapshot["total_value"], | |
| }) | |
| if progress_callback: | |
| progress_callback(i + 1, total_days) | |
| logger.debug(f"{date} | {asset} | {decision.get('action')} | price={price:.2f} | portfolio={portfolio_snapshot['total_value']:.2f}") | |
| # Final metrics | |
| hodl_final = hodl_portfolio.equity_history[-1]["value"] if hodl_portfolio.equity_history else portfolio.initial_capital | |
| metrics = compute_metrics(portfolio.equity_history, portfolio.initial_capital, hodl_final) | |
| hodl_curve = [{"date": e["date"], "value": e["value"]} for e in hodl_portfolio.equity_history] | |
| equity_curve = [{"date": e["date"], "value": e["value"], "action": e.get("action", "HOLD")} for e in portfolio.equity_history] | |
| return { | |
| "benchmark": benchmark, | |
| "model": model, | |
| "asset": asset, | |
| "start_date": start_date, | |
| "end_date": end_date, | |
| "metrics": metrics, | |
| "equity_curve": equity_curve, | |
| "hodl_curve": hodl_curve, | |
| "decisions": decisions_log, | |
| } | |