Spaces:
Running
Running
| import logging | |
| import time | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| import requests | |
| logger = logging.getLogger(__name__) | |
| ASSET_CRYPTOCOMPARE_MAP = { | |
| "BTC/USDT": ("BTC", "USD"), | |
| "ETH/USDT": ("ETH", "USD"), | |
| } | |
| ASSET_COINBASE_MAP = { | |
| "BTC/USDT": "BTC-USD", | |
| "ETH/USDT": "ETH-USD", | |
| } | |
| ASSET_KRAKEN_MAP = { | |
| "BTC/USDT": "XXBTZUSD", | |
| "ETH/USDT": "XETHZUSD", | |
| } | |
| ASSET_BINANCE_MAP = { | |
| "BTC/USDT": "BTCUSDT", | |
| "ETH/USDT": "ETHUSDT", | |
| } | |
| def fetch_ohlcv(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| """Fetch OHLCV data from multiple sources in order.""" | |
| errors = [] | |
| for name, fn in [ | |
| ("CryptoCompare", _fetch_cryptocompare), | |
| ("Coinbase", _fetch_coinbase), | |
| ("Kraken", _fetch_kraken), | |
| ("Binance-REST", _fetch_binance), | |
| ("ccxt", _fetch_ccxt), | |
| ("yfinance", _fetch_yfinance), | |
| ]: | |
| try: | |
| df = fn(asset, start_date, end_date) | |
| if df is not None and not df.empty: | |
| logger.info(f"Fetched {len(df)} candles for {asset} via {name}") | |
| return df | |
| except Exception as e: | |
| errors.append(f"{name}: {e}") | |
| logger.warning(f"{name} failed for {asset}: {e}") | |
| raise ValueError(f"All data sources failed for {asset}: {'; '.join(errors)}") | |
| def _fetch_cryptocompare(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| """CryptoCompare free API — no auth required, works from any IP.""" | |
| mapping = ASSET_CRYPTOCOMPARE_MAP.get(asset) | |
| if not mapping: | |
| raise ValueError(f"No CryptoCompare mapping for {asset}") | |
| fsym, tsym = mapping | |
| start_dt = datetime.strptime(start_date, "%Y-%m-%d") | |
| end_dt = datetime.strptime(end_date, "%Y-%m-%d") | |
| days_total = (end_dt - start_dt).days + 1 | |
| all_rows = [] | |
| # CryptoCompare returns up to 2000 daily candles per call | |
| batch_size = 2000 | |
| to_ts = int(end_dt.timestamp()) + 86400 | |
| while to_ts > int(start_dt.timestamp()): | |
| limit = min(batch_size, days_total) | |
| resp = requests.get( | |
| "https://min-api.cryptocompare.com/data/v2/histoday", | |
| params={ | |
| "fsym": fsym, | |
| "tsym": tsym, | |
| "limit": limit, | |
| "toTs": to_ts, | |
| }, | |
| timeout=30, | |
| headers={"User-Agent": "CryptoAgentBench/1.0"}, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("Response") != "Success": | |
| raise ValueError(f"CryptoCompare error: {data.get('Message', data)}") | |
| candles = data["Data"]["Data"] | |
| if not candles: | |
| break | |
| for c in candles: | |
| date_str = datetime.utcfromtimestamp(c["time"]).strftime("%Y-%m-%d") | |
| if date_str < start_date or date_str > end_date: | |
| continue | |
| if c["close"] == 0: | |
| continue | |
| all_rows.append({ | |
| "date": date_str, | |
| "open": float(c["open"]), | |
| "high": float(c["high"]), | |
| "low": float(c["low"]), | |
| "close": float(c["close"]), | |
| "volume": float(c["volumefrom"]), | |
| }) | |
| earliest = datetime.utcfromtimestamp(candles[0]["time"]).strftime("%Y-%m-%d") | |
| if earliest <= start_date: | |
| break | |
| to_ts = int(candles[0]["time"]) - 1 | |
| if not all_rows: | |
| raise ValueError(f"No CryptoCompare data for {fsym}/{tsym} in range {start_date}-{end_date}") | |
| df = pd.DataFrame(all_rows) | |
| df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) | |
| return df | |
| def _fetch_coinbase(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| """Coinbase Advanced Trade public API — no auth, US-IP friendly.""" | |
| product_id = ASSET_COINBASE_MAP.get(asset) | |
| if not product_id: | |
| raise ValueError(f"No Coinbase mapping for {asset}") | |
| start_dt = datetime.strptime(start_date, "%Y-%m-%d") | |
| end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) | |
| all_rows = [] | |
| # Coinbase returns max 300 candles per call for granularity=86400 | |
| chunk_days = 290 | |
| current = start_dt | |
| while current < end_dt: | |
| chunk_end = min(current + timedelta(days=chunk_days), end_dt) | |
| resp = requests.get( | |
| f"https://api.exchange.coinbase.com/products/{product_id}/candles", | |
| params={ | |
| "granularity": 86400, | |
| "start": current.isoformat(), | |
| "end": chunk_end.isoformat(), | |
| }, | |
| timeout=30, | |
| headers={"User-Agent": "CryptoAgentBench/1.0"}, | |
| ) | |
| resp.raise_for_status() | |
| candles = resp.json() | |
| if isinstance(candles, dict) and "message" in candles: | |
| raise ValueError(f"Coinbase error: {candles['message']}") | |
| for c in candles: | |
| # Format: [timestamp, low, high, open, close, volume] | |
| ts, low, high, open_, close, vol = c[0], c[1], c[2], c[3], c[4], c[5] | |
| date_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d") | |
| if date_str < start_date or date_str > end_date: | |
| continue | |
| all_rows.append({ | |
| "date": date_str, | |
| "open": float(open_), | |
| "high": float(high), | |
| "low": float(low), | |
| "close": float(close), | |
| "volume": float(vol), | |
| }) | |
| current = chunk_end | |
| time.sleep(0.2) | |
| if not all_rows: | |
| raise ValueError(f"No Coinbase data for {product_id} in range {start_date}-{end_date}") | |
| df = pd.DataFrame(all_rows) | |
| df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) | |
| return df | |
| def _fetch_kraken(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| pair = ASSET_KRAKEN_MAP.get(asset) | |
| if not pair: | |
| raise ValueError(f"No Kraken pair for {asset}") | |
| since = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp()) | |
| end_ts = int( | |
| (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).timestamp() | |
| ) | |
| all_rows = [] | |
| current_since = since | |
| for _ in range(10): | |
| resp = requests.get( | |
| "https://api.kraken.com/0/public/OHLC", | |
| params={"pair": pair, "interval": 1440, "since": current_since}, | |
| timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("error"): | |
| raise ValueError(f"Kraken error: {data['error']}") | |
| # Result dict has pair key + "last" key | |
| pair_keys = [k for k in data["result"] if k != "last"] | |
| if not pair_keys: | |
| break | |
| candles = data["result"][pair_keys[0]] | |
| last = data["result"].get("last", 0) | |
| added = 0 | |
| for c in candles: | |
| ts = int(c[0]) | |
| if ts >= end_ts: | |
| break | |
| date_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d") | |
| if start_date <= date_str <= end_date: | |
| all_rows.append({ | |
| "date": date_str, | |
| "open": float(c[1]), | |
| "high": float(c[2]), | |
| "low": float(c[3]), | |
| "close": float(c[4]), | |
| "volume": float(c[6]), | |
| }) | |
| added += 1 | |
| if last == 0 or last >= end_ts or len(candles) < 720: | |
| break | |
| current_since = last | |
| time.sleep(0.5) | |
| if not all_rows: | |
| raise ValueError(f"No Kraken data for {pair} in range {start_date}-{end_date}") | |
| df = pd.DataFrame(all_rows) | |
| df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) | |
| return df | |
| def _fetch_binance(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| symbol = ASSET_BINANCE_MAP.get(asset) | |
| if not symbol: | |
| raise ValueError(f"No Binance symbol for {asset}") | |
| start_ms = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000) | |
| end_ms = int( | |
| (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).timestamp() * 1000 | |
| ) | |
| all_candles = [] | |
| current_start = start_ms | |
| while current_start < end_ms: | |
| resp = requests.get( | |
| "https://api.binance.com/api/v3/klines", | |
| params={ | |
| "symbol": symbol, | |
| "interval": "1d", | |
| "startTime": current_start, | |
| "endTime": end_ms, | |
| "limit": 1000, | |
| }, | |
| timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| candles = resp.json() | |
| if not candles: | |
| break | |
| all_candles.extend(candles) | |
| current_start = candles[-1][0] + 86400000 | |
| if len(candles) < 1000: | |
| break | |
| if not all_candles: | |
| raise ValueError(f"No data from Binance for {symbol}") | |
| df = pd.DataFrame(all_candles, columns=[ | |
| "timestamp", "open", "high", "low", "close", "volume", | |
| "close_time", "quote_volume", "num_trades", | |
| "taker_buy_base", "taker_buy_quote", "ignore", | |
| ]) | |
| df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.strftime("%Y-%m-%d") | |
| df = df[(df["date"] >= start_date) & (df["date"] <= end_date)] | |
| for col in ["open", "high", "low", "close", "volume"]: | |
| df[col] = df[col].astype(float) | |
| df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) | |
| return df[["date", "open", "high", "low", "close", "volume"]] | |
| def _fetch_ccxt(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| import ccxt | |
| exchange = ccxt.binance({"enableRateLimit": True}) | |
| since = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000) | |
| limit = 1000 | |
| all_candles = [] | |
| current_since = since | |
| end_ts = int(datetime.strptime(end_date, "%Y-%m-%d").timestamp() * 1000) | |
| while current_since < end_ts: | |
| candles = exchange.fetch_ohlcv(asset, "1d", since=current_since, limit=limit) | |
| if not candles: | |
| break | |
| all_candles.extend(candles) | |
| current_since = candles[-1][0] + 86400000 | |
| if len(candles) < limit: | |
| break | |
| if not all_candles: | |
| raise ValueError(f"No data returned from ccxt for {asset}") | |
| df = pd.DataFrame(all_candles, columns=["timestamp", "open", "high", "low", "close", "volume"]) | |
| df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.date.astype(str) | |
| df = df[(df["date"] >= start_date) & (df["date"] <= end_date)] | |
| df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True) | |
| return df | |
| def _fetch_yfinance(asset: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| import yfinance as yf | |
| from config import ASSET_YFINANCE_MAP | |
| ticker = ASSET_YFINANCE_MAP.get(asset, asset.replace("/", "-")) | |
| end_dt = (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") | |
| data = yf.download(ticker, start=start_date, end=end_dt, progress=False, auto_adjust=True) | |
| if data.empty: | |
| raise ValueError(f"No data returned from yfinance for {ticker}") | |
| df = pd.DataFrame() | |
| df["open"] = data["Open"].values.flatten() | |
| df["high"] = data["High"].values.flatten() | |
| df["low"] = data["Low"].values.flatten() | |
| df["close"] = data["Close"].values.flatten() | |
| df["volume"] = data["Volume"].values.flatten() | |
| df["date"] = data.index.strftime("%Y-%m-%d") | |
| df = df.sort_values("date").reset_index(drop=True) | |
| return df | |
| def ohlcv_to_records(df: pd.DataFrame) -> list: | |
| """Convert OHLCV DataFrame to list of dicts for prompts.""" | |
| records = [] | |
| for _, row in df.iterrows(): | |
| records.append({ | |
| "date": str(row["date"]), | |
| "open": float(row["open"]), | |
| "high": float(row["high"]), | |
| "low": float(row["low"]), | |
| "close": float(row["close"]), | |
| "volume": float(row["volume"]), | |
| }) | |
| return records | |