exchange-data-manager/src/exchange_data_manager/exchanges/ccxt_connector.py

379 lines
13 KiB
Python

"""
Generic CCXT connector - works with ANY ccxt-supported exchange.
This provides a unified interface to 100+ cryptocurrency exchanges
using ccxt's standardized API.
Streaming is implemented via polling, which works universally across
all exchanges without requiring exchange-specific WebSocket code.
"""
import asyncio
import logging
from typing import List, Optional, Callable, Dict, TYPE_CHECKING
import ccxt.async_support as ccxt
from .base import BaseExchangeConnector
from ..candles.models import Candle
if TYPE_CHECKING:
from ..config import ExchangeConfig
logger = logging.getLogger(__name__)
class PollingSubscription:
"""Manages a polling-based subscription that mimics WebSocket streaming."""
def __init__(
self,
subscription_id: str,
connector: "CCXTConnector",
symbol: str,
timeframe: str,
callback: Callable[[Candle], None],
poll_interval: float = 5.0,
):
self.subscription_id = subscription_id
self.connector = connector
self.symbol = symbol
self.timeframe = timeframe
self.callback = callback
self.poll_interval = poll_interval
self._task: Optional[asyncio.Task] = None
self._running = False
self._last_candle_time: Optional[int] = None
async def start(self):
"""Start the polling loop."""
self._running = True
self._task = asyncio.create_task(self._poll_loop())
logger.info(
f"Started polling subscription {self.subscription_id}: "
f"{self.symbol} {self.timeframe} every {self.poll_interval}s"
)
async def stop(self):
"""Stop the polling loop."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info(f"Stopped polling subscription {self.subscription_id}")
async def _poll_loop(self):
"""Continuously poll for new candles."""
while self._running:
try:
# Fetch the most recent candles
candles = await self.connector.fetch_candles(
symbol=self.symbol,
timeframe=self.timeframe,
limit=2, # Get last 2 (current + previous)
)
if candles:
for candle in candles:
# Send new or updated candles
if (
self._last_candle_time is None
or candle.time >= self._last_candle_time
):
# Mark as closed if it's not the latest
if candle.time < candles[-1].time:
candle = Candle(
time=candle.time,
open=candle.open,
high=candle.high,
low=candle.low,
close=candle.close,
volume=candle.volume,
closed=True,
)
try:
self.callback(candle)
except Exception as e:
logger.error(f"Callback error: {e}")
self._last_candle_time = candles[-1].time
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Polling error for {self.subscription_id}: {e}")
await asyncio.sleep(self.poll_interval)
class CCXTConnector(BaseExchangeConnector):
"""
Generic exchange connector using ccxt's unified API.
Works with any exchange supported by ccxt (100+ exchanges).
Usage:
# Public data (no credentials needed for most exchanges)
connector = CCXTConnector("binance")
candles = await connector.fetch_candles("BTC/USDT", "5m")
# Authenticated (for exchanges requiring API keys)
connector = CCXTConnector("kucoin", api_key="...", api_secret="...")
"""
def __init__(
self,
exchange_id: str,
config: Optional["ExchangeConfig"] = None,
api_key: Optional[str] = None,
api_secret: Optional[str] = None,
password: Optional[str] = None, # Some exchanges require this (e.g., KuCoin)
testnet: bool = False,
):
"""
Initialize connector for any ccxt-supported exchange.
Args:
exchange_id: Exchange name (e.g., "binance", "kucoin", "kraken")
Must match a ccxt exchange id.
config: Optional exchange configuration
api_key: Optional API key
api_secret: Optional API secret
password: Optional passphrase (required by some exchanges like KuCoin)
testnet: Whether to use testnet/sandbox mode
"""
super().__init__(exchange_id, config, api_key, api_secret, testnet)
self.exchange_id = exchange_id.lower()
self.password = password
# Verify exchange is supported by ccxt
if self.exchange_id not in ccxt.exchanges:
available = ", ".join(sorted(ccxt.exchanges)[:10]) + "..."
raise ValueError(
f"Exchange '{exchange_id}' not supported by ccxt. "
f"Available exchanges include: {available}"
)
# Build ccxt client config
ccxt_config = {
"enableRateLimit": True,
}
# Apply custom rate limit if provided
if config and config.rate_limit_ms:
ccxt_config["rateLimit"] = config.rate_limit_ms
# Add credentials if provided
if api_key:
ccxt_config["apiKey"] = api_key
if api_secret:
ccxt_config["secret"] = api_secret
if password:
ccxt_config["password"] = password
# Create the exchange client dynamically
exchange_class = getattr(ccxt, self.exchange_id)
self.client = exchange_class(ccxt_config)
# Enable sandbox mode if requested
if testnet:
if hasattr(self.client, 'set_sandbox_mode'):
self.client.set_sandbox_mode(True)
else:
logger.warning(f"{exchange_id} does not support sandbox mode")
# Cache for exchange metadata
self._timeframes: Optional[List[str]] = None
self._symbols: Optional[List[str]] = None
# Polling-based subscriptions (mimics WebSocket streaming)
self._subscriptions: Dict[str, PollingSubscription] = {}
self._subscription_counter = 0
# Default poll interval in seconds (can be overridden per subscription)
self.default_poll_interval = 5.0
logger.info(f"CCXTConnector initialized for {exchange_id}")
async def fetch_candles(
self,
symbol: str,
timeframe: str,
start: Optional[int] = None,
end: Optional[int] = None,
limit: Optional[int] = None,
) -> List[Candle]:
"""
Fetch historical candles from the exchange.
Args:
symbol: Trading pair (e.g., "BTC/USDT")
timeframe: Candle timeframe (e.g., "5m", "1h")
start: Start timestamp in seconds (inclusive)
end: End timestamp in seconds (inclusive)
limit: Maximum number of candles to fetch
Returns:
List of Candle objects sorted by time ascending
"""
candles: List[Candle] = []
# Convert seconds to milliseconds for ccxt
since = start * 1000 if start is not None else None
until = end * 1000 if end is not None else None
# Most exchanges have a max limit per request (often 500-1000)
# ccxt handles this internally with pagination for some exchanges
fetch_limit = min(limit or 1000, 1000)
try:
# Load markets if not already loaded (needed for symbol validation)
if not self.client.markets:
await self.client.load_markets()
while True:
ohlcv = await self.client.fetch_ohlcv(
symbol=symbol,
timeframe=timeframe,
since=since,
limit=fetch_limit,
)
if not ohlcv:
break
for row in ohlcv:
candle = Candle.from_ccxt(row)
# Skip if past end time
if until is not None and candle.time * 1000 > until:
return sorted(candles, key=lambda c: c.time)
candles.append(candle)
# Check if we have enough or reached the end
if len(ohlcv) < fetch_limit:
break
if limit and len(candles) >= limit:
break
# Move to next batch
since = ohlcv[-1][0] + 1 # +1 ms to avoid duplicate
except ccxt.BaseError as e:
logger.error(f"Error fetching candles from {self.exchange_id}: {e}")
raise
result = sorted(candles, key=lambda c: c.time)
# Apply limit if specified
if limit and len(result) > limit:
result = result[-limit:]
return result
async def subscribe(
self,
symbol: str,
timeframe: str,
callback: Callable[[Candle], None],
poll_interval: Optional[float] = None,
) -> str:
"""
Subscribe to candle updates via polling.
This provides a streaming-like interface by polling the REST API
at regular intervals. Works with any ccxt-supported exchange.
Args:
symbol: Trading pair (e.g., "BTC/USDT")
timeframe: Candle timeframe (e.g., "1m", "5m")
callback: Function called with each candle update
poll_interval: Seconds between polls (default: 5.0)
Returns:
Subscription ID for unsubscribing
"""
self._subscription_counter += 1
subscription_id = f"{self.exchange_id}_{symbol}_{timeframe}_{self._subscription_counter}"
interval = poll_interval or self.default_poll_interval
subscription = PollingSubscription(
subscription_id=subscription_id,
connector=self,
symbol=symbol,
timeframe=timeframe,
callback=callback,
poll_interval=interval,
)
self._subscriptions[subscription_id] = subscription
await subscription.start()
return subscription_id
async def unsubscribe(self, subscription_id: str):
"""
Unsubscribe from candle updates.
Args:
subscription_id: ID returned from subscribe()
"""
if subscription_id in self._subscriptions:
subscription = self._subscriptions.pop(subscription_id)
await subscription.stop()
else:
logger.warning(f"Subscription not found: {subscription_id}")
async def get_symbols(self) -> List[str]:
"""Get list of available trading symbols."""
if self._symbols is None:
try:
await self.client.load_markets()
self._symbols = list(self.client.symbols)
except ccxt.BaseError as e:
logger.error(f"Error loading {self.exchange_id} markets: {e}")
return []
return self._symbols
def get_timeframes(self) -> List[str]:
"""
Get list of supported timeframes.
Returns timeframes supported by this exchange.
"""
if self._timeframes is None:
# ccxt stores timeframes in the client
if hasattr(self.client, 'timeframes') and self.client.timeframes:
self._timeframes = list(self.client.timeframes.keys())
else:
# Fallback to common timeframes
self._timeframes = ["1m", "5m", "15m", "1h", "4h", "1d"]
return self._timeframes
async def close(self):
"""Close all connections and stop all subscriptions."""
# Stop all polling subscriptions
for subscription_id in list(self._subscriptions.keys()):
await self.unsubscribe(subscription_id)
# Close ccxt client
await self.client.close()
logger.info(f"{self.exchange_id} connector closed")
@staticmethod
def list_exchanges() -> List[str]:
"""List all exchanges supported by ccxt."""
return list(ccxt.exchanges)
@staticmethod
def is_exchange_supported(exchange_id: str) -> bool:
"""Check if an exchange is supported."""
return exchange_id.lower() in ccxt.exchanges