MDIIII's picture
perf: raise rate limit to 200/min for paid-tier models
2f5bdbc
Raw
History Blame Contribute Delete
11.9 kB
import logging
import time
from datetime import datetime, timedelta
import pandas as pd
import requests
logger = logging.getLogger(__name__)
ASSET_CRYPTOCOMPARE_MAP = {
"BTC/USDT": ("BTC", "USD"),
"ETH/USDT": ("ETH", "USD"),
}
ASSET_COINBASE_MAP = {
"BTC/USDT": "BTC-USD",
"ETH/USDT": "ETH-USD",
}
ASSET_KRAKEN_MAP = {
"BTC/USDT": "XXBTZUSD",
"ETH/USDT": "XETHZUSD",
}
ASSET_BINANCE_MAP = {
"BTC/USDT": "BTCUSDT",
"ETH/USDT": "ETHUSDT",
}
def fetch_ohlcv(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
"""Fetch OHLCV data from multiple sources in order."""
errors = []
for name, fn in [
("CryptoCompare", _fetch_cryptocompare),
("Coinbase", _fetch_coinbase),
("Kraken", _fetch_kraken),
("Binance-REST", _fetch_binance),
("ccxt", _fetch_ccxt),
("yfinance", _fetch_yfinance),
]:
try:
df = fn(asset, start_date, end_date)
if df is not None and not df.empty:
logger.info(f"Fetched {len(df)} candles for {asset} via {name}")
return df
except Exception as e:
errors.append(f"{name}: {e}")
logger.warning(f"{name} failed for {asset}: {e}")
raise ValueError(f"All data sources failed for {asset}: {'; '.join(errors)}")
def _fetch_cryptocompare(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
"""CryptoCompare free API — no auth required, works from any IP."""
mapping = ASSET_CRYPTOCOMPARE_MAP.get(asset)
if not mapping:
raise ValueError(f"No CryptoCompare mapping for {asset}")
fsym, tsym = mapping
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
days_total = (end_dt - start_dt).days + 1
all_rows = []
# CryptoCompare returns up to 2000 daily candles per call
batch_size = 2000
to_ts = int(end_dt.timestamp()) + 86400
while to_ts > int(start_dt.timestamp()):
limit = min(batch_size, days_total)
resp = requests.get(
"https://min-api.cryptocompare.com/data/v2/histoday",
params={
"fsym": fsym,
"tsym": tsym,
"limit": limit,
"toTs": to_ts,
},
timeout=30,
headers={"User-Agent": "CryptoAgentBench/1.0"},
)
resp.raise_for_status()
data = resp.json()
if data.get("Response") != "Success":
raise ValueError(f"CryptoCompare error: {data.get('Message', data)}")
candles = data["Data"]["Data"]
if not candles:
break
for c in candles:
date_str = datetime.utcfromtimestamp(c["time"]).strftime("%Y-%m-%d")
if date_str < start_date or date_str > end_date:
continue
if c["close"] == 0:
continue
all_rows.append({
"date": date_str,
"open": float(c["open"]),
"high": float(c["high"]),
"low": float(c["low"]),
"close": float(c["close"]),
"volume": float(c["volumefrom"]),
})
earliest = datetime.utcfromtimestamp(candles[0]["time"]).strftime("%Y-%m-%d")
if earliest <= start_date:
break
to_ts = int(candles[0]["time"]) - 1
if not all_rows:
raise ValueError(f"No CryptoCompare data for {fsym}/{tsym} in range {start_date}-{end_date}")
df = pd.DataFrame(all_rows)
df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True)
return df
def _fetch_coinbase(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
"""Coinbase Advanced Trade public API — no auth, US-IP friendly."""
product_id = ASSET_COINBASE_MAP.get(asset)
if not product_id:
raise ValueError(f"No Coinbase mapping for {asset}")
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
all_rows = []
# Coinbase returns max 300 candles per call for granularity=86400
chunk_days = 290
current = start_dt
while current < end_dt:
chunk_end = min(current + timedelta(days=chunk_days), end_dt)
resp = requests.get(
f"https://api.exchange.coinbase.com/products/{product_id}/candles",
params={
"granularity": 86400,
"start": current.isoformat(),
"end": chunk_end.isoformat(),
},
timeout=30,
headers={"User-Agent": "CryptoAgentBench/1.0"},
)
resp.raise_for_status()
candles = resp.json()
if isinstance(candles, dict) and "message" in candles:
raise ValueError(f"Coinbase error: {candles['message']}")
for c in candles:
# Format: [timestamp, low, high, open, close, volume]
ts, low, high, open_, close, vol = c[0], c[1], c[2], c[3], c[4], c[5]
date_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d")
if date_str < start_date or date_str > end_date:
continue
all_rows.append({
"date": date_str,
"open": float(open_),
"high": float(high),
"low": float(low),
"close": float(close),
"volume": float(vol),
})
current = chunk_end
time.sleep(0.2)
if not all_rows:
raise ValueError(f"No Coinbase data for {product_id} in range {start_date}-{end_date}")
df = pd.DataFrame(all_rows)
df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True)
return df
def _fetch_kraken(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
pair = ASSET_KRAKEN_MAP.get(asset)
if not pair:
raise ValueError(f"No Kraken pair for {asset}")
since = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp())
end_ts = int(
(datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).timestamp()
)
all_rows = []
current_since = since
for _ in range(10):
resp = requests.get(
"https://api.kraken.com/0/public/OHLC",
params={"pair": pair, "interval": 1440, "since": current_since},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if data.get("error"):
raise ValueError(f"Kraken error: {data['error']}")
# Result dict has pair key + "last" key
pair_keys = [k for k in data["result"] if k != "last"]
if not pair_keys:
break
candles = data["result"][pair_keys[0]]
last = data["result"].get("last", 0)
added = 0
for c in candles:
ts = int(c[0])
if ts >= end_ts:
break
date_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d")
if start_date <= date_str <= end_date:
all_rows.append({
"date": date_str,
"open": float(c[1]),
"high": float(c[2]),
"low": float(c[3]),
"close": float(c[4]),
"volume": float(c[6]),
})
added += 1
if last == 0 or last >= end_ts or len(candles) < 720:
break
current_since = last
time.sleep(0.5)
if not all_rows:
raise ValueError(f"No Kraken data for {pair} in range {start_date}-{end_date}")
df = pd.DataFrame(all_rows)
df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True)
return df
def _fetch_binance(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
symbol = ASSET_BINANCE_MAP.get(asset)
if not symbol:
raise ValueError(f"No Binance symbol for {asset}")
start_ms = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000)
end_ms = int(
(datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).timestamp() * 1000
)
all_candles = []
current_start = start_ms
while current_start < end_ms:
resp = requests.get(
"https://api.binance.com/api/v3/klines",
params={
"symbol": symbol,
"interval": "1d",
"startTime": current_start,
"endTime": end_ms,
"limit": 1000,
},
timeout=30,
)
resp.raise_for_status()
candles = resp.json()
if not candles:
break
all_candles.extend(candles)
current_start = candles[-1][0] + 86400000
if len(candles) < 1000:
break
if not all_candles:
raise ValueError(f"No data from Binance for {symbol}")
df = pd.DataFrame(all_candles, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "num_trades",
"taker_buy_base", "taker_buy_quote", "ignore",
])
df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.strftime("%Y-%m-%d")
df = df[(df["date"] >= start_date) & (df["date"] <= end_date)]
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True)
return df[["date", "open", "high", "low", "close", "volume"]]
def _fetch_ccxt(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
import ccxt
exchange = ccxt.binance({"enableRateLimit": True})
since = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000)
limit = 1000
all_candles = []
current_since = since
end_ts = int(datetime.strptime(end_date, "%Y-%m-%d").timestamp() * 1000)
while current_since < end_ts:
candles = exchange.fetch_ohlcv(asset, "1d", since=current_since, limit=limit)
if not candles:
break
all_candles.extend(candles)
current_since = candles[-1][0] + 86400000
if len(candles) < limit:
break
if not all_candles:
raise ValueError(f"No data returned from ccxt for {asset}")
df = pd.DataFrame(all_candles, columns=["timestamp", "open", "high", "low", "close", "volume"])
df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.date.astype(str)
df = df[(df["date"] >= start_date) & (df["date"] <= end_date)]
df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True)
return df
def _fetch_yfinance(asset: str, start_date: str, end_date: str) -> pd.DataFrame:
import yfinance as yf
from config import ASSET_YFINANCE_MAP
ticker = ASSET_YFINANCE_MAP.get(asset, asset.replace("/", "-"))
end_dt = (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")
data = yf.download(ticker, start=start_date, end=end_dt, progress=False, auto_adjust=True)
if data.empty:
raise ValueError(f"No data returned from yfinance for {ticker}")
df = pd.DataFrame()
df["open"] = data["Open"].values.flatten()
df["high"] = data["High"].values.flatten()
df["low"] = data["Low"].values.flatten()
df["close"] = data["Close"].values.flatten()
df["volume"] = data["Volume"].values.flatten()
df["date"] = data.index.strftime("%Y-%m-%d")
df = df.sort_values("date").reset_index(drop=True)
return df
def ohlcv_to_records(df: pd.DataFrame) -> list:
"""Convert OHLCV DataFrame to list of dicts for prompts."""
records = []
for _, row in df.iterrows():
records.append({
"date": str(row["date"]),
"open": float(row["open"]),
"high": float(row["high"]),
"low": float(row["low"]),
"close": float(row["close"]),
"volume": float(row["volume"]),
})
return records