import logging import time from datetime import datetime, timedelta import pandas as pd import requests logger = logging.getLogger(__name__) ASSET_CRYPTOCOMPARE_MAP = { "BTC/USDT": ("BTC", "USD"), "ETH/USDT": ("ETH", "USD"), } ASSET_COINBASE_MAP = { "BTC/USDT": "BTC-USD", "ETH/USDT": "ETH-USD", } ASSET_KRAKEN_MAP = { "BTC/USDT": "XXBTZUSD", "ETH/USDT": "XETHZUSD", } ASSET_BINANCE_MAP = { "BTC/USDT": "BTCUSDT", "ETH/USDT": "ETHUSDT", } def fetch_ohlcv(asset: str, start_date: str, end_date: str) -> pd.DataFrame: """Fetch OHLCV data from multiple sources in order.""" errors = [] for name, fn in [ ("CryptoCompare", _fetch_cryptocompare), ("Coinbase", _fetch_coinbase), ("Kraken", _fetch_kraken), ("Binance-REST", _fetch_binance), ("ccxt", _fetch_ccxt), ("yfinance", _fetch_yfinance), ]: try: df = fn(asset, start_date, end_date) if df is not None and not df.empty: logger.info(f"Fetched {len(df)} candles for {asset} via {name}") return df except Exception as e: errors.append(f"{name}: {e}") logger.warning(f"{name} failed for {asset}: {e}") raise ValueError(f"All data sources failed for {asset}: {'; '.join(errors)}") def _fetch_cryptocompare(asset: str, start_date: str, end_date: str) -> pd.DataFrame: """CryptoCompare free API — no auth required, works from any IP.""" mapping = ASSET_CRYPTOCOMPARE_MAP.get(asset) if not mapping: raise ValueError(f"No CryptoCompare mapping for {asset}") fsym, tsym = mapping start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") days_total = (end_dt - start_dt).days + 1 all_rows = [] # CryptoCompare returns up to 2000 daily candles per call batch_size = 2000 to_ts = int(end_dt.timestamp()) + 86400 while to_ts > int(start_dt.timestamp()): limit = min(batch_size, days_total) resp = requests.get( "https://min-api.cryptocompare.com/data/v2/histoday", params={ "fsym": fsym, "tsym": tsym, "limit": limit, "toTs": to_ts, }, timeout=30, headers={"User-Agent": "CryptoAgentBench/1.0"}, ) resp.raise_for_status() data = resp.json() if data.get("Response") != "Success": raise ValueError(f"CryptoCompare error: {data.get('Message', data)}") candles = data["Data"]["Data"] if not candles: break for c in candles: date_str = datetime.utcfromtimestamp(c["time"]).strftime("%Y-%m-%d") if date_str < start_date or date_str > end_date: continue if c["close"] == 0: continue all_rows.append({ "date": date_str, "open": float(c["open"]), "high": float(c["high"]), "low": float(c["low"]), "close": float(c["close"]), "volume": float(c["volumefrom"]), }) earliest = datetime.utcfromtimestamp(candles[0]["time"]).strftime("%Y-%m-%d") if earliest <= start_date: break to_ts = int(candles[0]["time"]) - 1 if not all_rows: raise ValueError(f"No CryptoCompare data for {fsym}/{tsym} in range {start_date}-{end_date}") df = pd.DataFrame(all_rows) df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) return df def _fetch_coinbase(asset: str, start_date: str, end_date: str) -> pd.DataFrame: """Coinbase Advanced Trade public API — no auth, US-IP friendly.""" product_id = ASSET_COINBASE_MAP.get(asset) if not product_id: raise ValueError(f"No Coinbase mapping for {asset}") start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) all_rows = [] # Coinbase returns max 300 candles per call for granularity=86400 chunk_days = 290 current = start_dt while current < end_dt: chunk_end = min(current + timedelta(days=chunk_days), end_dt) resp = requests.get( f"https://api.exchange.coinbase.com/products/{product_id}/candles", params={ "granularity": 86400, "start": current.isoformat(), "end": chunk_end.isoformat(), }, timeout=30, headers={"User-Agent": "CryptoAgentBench/1.0"}, ) resp.raise_for_status() candles = resp.json() if isinstance(candles, dict) and "message" in candles: raise ValueError(f"Coinbase error: {candles['message']}") for c in candles: # Format: [timestamp, low, high, open, close, volume] ts, low, high, open_, close, vol = c[0], c[1], c[2], c[3], c[4], c[5] date_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d") if date_str < start_date or date_str > end_date: continue all_rows.append({ "date": date_str, "open": float(open_), "high": float(high), "low": float(low), "close": float(close), "volume": float(vol), }) current = chunk_end time.sleep(0.2) if not all_rows: raise ValueError(f"No Coinbase data for {product_id} in range {start_date}-{end_date}") df = pd.DataFrame(all_rows) df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) return df def _fetch_kraken(asset: str, start_date: str, end_date: str) -> pd.DataFrame: pair = ASSET_KRAKEN_MAP.get(asset) if not pair: raise ValueError(f"No Kraken pair for {asset}") since = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp()) end_ts = int( (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).timestamp() ) all_rows = [] current_since = since for _ in range(10): resp = requests.get( "https://api.kraken.com/0/public/OHLC", params={"pair": pair, "interval": 1440, "since": current_since}, timeout=30, ) resp.raise_for_status() data = resp.json() if data.get("error"): raise ValueError(f"Kraken error: {data['error']}") # Result dict has pair key + "last" key pair_keys = [k for k in data["result"] if k != "last"] if not pair_keys: break candles = data["result"][pair_keys[0]] last = data["result"].get("last", 0) added = 0 for c in candles: ts = int(c[0]) if ts >= end_ts: break date_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d") if start_date <= date_str <= end_date: all_rows.append({ "date": date_str, "open": float(c[1]), "high": float(c[2]), "low": float(c[3]), "close": float(c[4]), "volume": float(c[6]), }) added += 1 if last == 0 or last >= end_ts or len(candles) < 720: break current_since = last time.sleep(0.5) if not all_rows: raise ValueError(f"No Kraken data for {pair} in range {start_date}-{end_date}") df = pd.DataFrame(all_rows) df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) return df def _fetch_binance(asset: str, start_date: str, end_date: str) -> pd.DataFrame: symbol = ASSET_BINANCE_MAP.get(asset) if not symbol: raise ValueError(f"No Binance symbol for {asset}") start_ms = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000) end_ms = int( (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).timestamp() * 1000 ) all_candles = [] current_start = start_ms while current_start < end_ms: resp = requests.get( "https://api.binance.com/api/v3/klines", params={ "symbol": symbol, "interval": "1d", "startTime": current_start, "endTime": end_ms, "limit": 1000, }, timeout=30, ) resp.raise_for_status() candles = resp.json() if not candles: break all_candles.extend(candles) current_start = candles[-1][0] + 86400000 if len(candles) < 1000: break if not all_candles: raise ValueError(f"No data from Binance for {symbol}") df = pd.DataFrame(all_candles, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_volume", "num_trades", "taker_buy_base", "taker_buy_quote", "ignore", ]) df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.strftime("%Y-%m-%d") df = df[(df["date"] >= start_date) & (df["date"] <= end_date)] for col in ["open", "high", "low", "close", "volume"]: df[col] = df[col].astype(float) df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) return df[["date", "open", "high", "low", "close", "volume"]] def _fetch_ccxt(asset: str, start_date: str, end_date: str) -> pd.DataFrame: import ccxt exchange = ccxt.binance({"enableRateLimit": True}) since = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000) limit = 1000 all_candles = [] current_since = since end_ts = int(datetime.strptime(end_date, "%Y-%m-%d").timestamp() * 1000) while current_since < end_ts: candles = exchange.fetch_ohlcv(asset, "1d", since=current_since, limit=limit) if not candles: break all_candles.extend(candles) current_since = candles[-1][0] + 86400000 if len(candles) < limit: break if not all_candles: raise ValueError(f"No data returned from ccxt for {asset}") df = pd.DataFrame(all_candles, columns=["timestamp", "open", "high", "low", "close", "volume"]) df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.date.astype(str) df = df[(df["date"] >= start_date) & (df["date"] <= end_date)] df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) return df def _fetch_yfinance(asset: str, start_date: str, end_date: str) -> pd.DataFrame: import yfinance as yf from config import ASSET_YFINANCE_MAP ticker = ASSET_YFINANCE_MAP.get(asset, asset.replace("/", "-")) end_dt = (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") data = yf.download(ticker, start=start_date, end=end_dt, progress=False, auto_adjust=True) if data.empty: raise ValueError(f"No data returned from yfinance for {ticker}") df = pd.DataFrame() df["open"] = data["Open"].values.flatten() df["high"] = data["High"].values.flatten() df["low"] = data["Low"].values.flatten() df["close"] = data["Close"].values.flatten() df["volume"] = data["Volume"].values.flatten() df["date"] = data.index.strftime("%Y-%m-%d") df = df.sort_values("date").reset_index(drop=True) return df def ohlcv_to_records(df: pd.DataFrame) -> list: """Convert OHLCV DataFrame to list of dicts for prompts.""" records = [] for _, row in df.iterrows(): records.append({ "date": str(row["date"]), "open": float(row["open"]), "high": float(row["high"]), "low": float(row["low"]), "close": float(row["close"]), "volume": float(row["volume"]), }) return records