import ccxt import pandas as pd from datetime import datetime, timedelta, timezone from typing import Tuple, Dict, List, Union, Any import time import logging from shared_utilities import timeframe_to_minutes logger = logging.getLogger(__name__) class Exchange: """ A class to interact with a cryptocurrency exchange using the CCXT library. """ _market_cache = {} def __init__(self, name: str, api_keys: Dict[str, str] | None, exchange_id: str, testnet: bool = False): """ Initializes the Exchange object. Parameters: name (str): The name of this exchange instance. api_keys (Dict[str, str]): Dictionary containing API credentials. Supports: - key / secret - optional passphrase (stored as 'passphrase' or legacy 'password') exchange_id (str): The ID of the exchange as recognized by CCXT. Example('binance') testnet (bool): Whether to use testnet/sandbox mode. Defaults to False. """ self.name = name self.api_key = api_keys['key'] if api_keys else None self.api_key_secret = api_keys['secret'] if api_keys else None self.api_passphrase = ( api_keys.get('passphrase') or api_keys.get('password') ) if api_keys else None self.configured = False self.exchange_id = exchange_id self.testnet = testnet self.edm_session_id = None # EDM session for authenticated candle access self.client: ccxt.Exchange = self._connect_exchange() if self.client: self._check_authentication() self.exchange_info = self._set_exchange_info() self.intervals = self._set_avail_intervals() self.symbols = self._set_symbols() self.balances = self._set_balances() self.symbols_n_precision = {} def _connect_exchange(self) -> ccxt.Exchange: """ Connects to the exchange using the CCXT library. Returns: ccxt.Exchange: An instance of the CCXT exchange class. """ exchange_class = getattr(ccxt, self.exchange_id) if not exchange_class: logger.error(f"Exchange {self.exchange_id} is not supported by CCXT.") raise ValueError(f"Exchange {self.exchange_id} is not supported by CCXT.") mode_str = "testnet" if self.testnet else "production" logger.info(f"Connecting to exchange {self.exchange_id} ({mode_str} mode).") config = { 'enableRateLimit': True, 'verbose': False, 'options': {'warnOnFetchOpenOrdersWithoutSymbol': False} } if self.api_key and self.api_key_secret: config['apiKey'] = self.api_key config['secret'] = self.api_key_secret if self.api_passphrase: # CCXT uses `password` for exchange passphrases (e.g., KuCoin). config['password'] = self.api_passphrase client = exchange_class(config) # Enable sandbox/testnet mode if requested if self.testnet: try: client.set_sandbox_mode(True) logger.info(f"Sandbox mode enabled for {self.exchange_id}") except Exception as e: # CRITICAL: Do NOT continue with production if testnet was requested # This prevents users from accidentally trading real money logger.error(f"TESTNET UNAVAILABLE: {self.exchange_id} does not support sandbox mode: {e}") raise ValueError( f"Testnet/sandbox mode is not available for {self.exchange_id}. " f"Please use paper trading mode instead, or trade on production with caution." ) return client def _check_authentication(self): if not (self.api_key and self.api_key_secret): self.configured = False return try: self.client.fetch_open_orders() # Much faster than fetch_balance self.configured = True logger.info("Authentication successful.") except ccxt.AuthenticationError: logger.error("Authentication failed. Please check your API keys.") except Exception as e: logger.error(f"An error occurred: {e}") @staticmethod def datetime_to_unix_millis(dt: datetime) -> int: """ Converts a datetime object to Unix milliseconds. Parameters: dt (datetime): The datetime object to convert. Returns: int: The Unix timestamp in milliseconds. """ if dt.tzinfo is None: raise ValueError("datetime object must be timezone-aware or in UTC.") return int(dt.timestamp() * 1000) def _fetch_price(self, symbol: str) -> float: """ Fetches the current price for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: float: The current price. """ try: ticker = self.client.fetch_ticker(symbol) return float(ticker['last']) except ccxt.BaseError as e: logger.error(f"Error fetching price for {symbol}: {str(e)}") return 0.0 def _fetch_min_qty(self, symbol: str) -> float: """ Fetches the minimum quantity for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: float: The minimum quantity. """ try: market_data = self.exchange_info[symbol] return float(market_data['limits']['amount']['min']) except KeyError as e: logger.error(f"Error fetching minimum quantity for {symbol}: {str(e)}") return 0.0 def _fetch_min_notional_qty(self, symbol: str) -> float: """ Fetches the minimum notional quantity for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: float: The minimum notional quantity. """ try: market_data = self.exchange_info[symbol] return float(market_data['limits']['cost']['min']) except KeyError as e: logger.error(f"Error fetching minimum notional quantity for {symbol}: {str(e)}") return 0.0 def _set_symbols(self) -> List[str]: """ Sets the list of available symbols on the exchange. Returns: List[str]: A list of trading symbols. """ try: markets = self.client.fetch_markets() symbols = [market['symbol'] for market in markets if market['active']] return symbols except ccxt.BaseError as e: logger.error(f"Error fetching symbols: {str(e)}") return [] def _set_balances(self) -> List[Dict[str, Union[str, float]]]: """ Sets the balances of the account. Returns: List[Dict[str, Union[str, float]]]: A list of balances with asset, balance, and PnL. """ if self.api_key and self.api_key_secret: try: # KuCoin has separate accounts (main, trade, margin, futures) # We need to explicitly request the 'trade' account for spot trading params = {} if self.exchange_id.lower() == 'kucoin': params['type'] = 'trade' logger.info(f"Fetching KuCoin balance with params: {params}, testnet={self.testnet}") account_info = self.client.fetch_balance(params) balances = [] for asset, balance in account_info['total'].items(): asset_balance = float(balance) if asset_balance > 0: balances.append({'asset': asset, 'balance': asset_balance, 'pnl': 0}) logger.info(f"Fetched balances for {self.exchange_id}: {balances}") return balances except ccxt.BaseError as e: logger.error(f"Error fetching balances: {str(e)}") return [{'asset': 'N/A', 'balance': 0, 'pnl': 0}] else: return [{'asset': 'N/A', 'balance': 0, 'pnl': 0}] def _set_exchange_info(self) -> dict: """ Sets the exchange information and caches it. Returns: dict: The exchange information. """ if self.exchange_id in Exchange._market_cache: return Exchange._market_cache[self.exchange_id] try: markets_info = self.client.load_markets() Exchange._market_cache[self.exchange_id] = markets_info return markets_info except ccxt.BaseError as e: logger.error(f"Error fetching market info: {str(e)}") return {} def get_client(self) -> object: """ Returns the CCXT client instance. Returns: object: The CCXT client. """ return self.client def get_avail_intervals(self) -> Tuple[str, ...]: """ Returns the available time intervals for OHLCV data. Returns: Tuple[str, ...]: A tuple of available intervals. """ return self.intervals def get_exchange_info(self) -> dict: """ Returns the exchange information. Returns: dict: The exchange information. """ return self.exchange_info def get_symbols(self) -> List[str]: """ Returns the list of available symbols. Returns: List[str]: A list of trading symbols. """ return self.symbols def get_balances(self) -> List[Dict[str, Union[str, float]]]: """ Returns the balances of the account. Returns: List[Dict[str, Union[str, float]]]: A list of balances with asset, balance, and PnL. """ return self.balances def refresh_balances(self) -> List[Dict[str, Union[str, float]]]: """ Refreshes and returns the balances from the exchange. Reinitializes the ccxt client to handle pickle corruption issues. Returns: List[Dict[str, Union[str, float]]]: Updated list of balances. """ # Reinitialize the ccxt client to fix any pickle corruption # (ccxt clients don't survive pickle/unpickle properly) self.client = self._connect_exchange() self.balances = self._set_balances() return self.balances def get_symbol_precision_rule(self, symbol: str) -> int: """ Returns the precision rule for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: int: The precision rule. """ r_value = self.symbols_n_precision.get(symbol) if r_value is None: self._set_precision_rule(symbol) r_value = self.symbols_n_precision.get(symbol) return r_value def get_price(self, symbol: str) -> float: """ Returns the current price for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: float: The current price. """ return self._fetch_price(symbol) def get_min_qty(self, symbol: str) -> float: """ Returns the minimum quantity for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: float: The minimum quantity. """ return self._fetch_min_qty(symbol) def get_min_notional_qty(self, symbol: str) -> float: """ Returns the minimum notional quantity for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: float: The minimum notional quantity. """ return self._fetch_min_notional_qty(symbol) def get_trading_fees(self, symbol: str = None) -> Dict[str, Any]: """ Returns the trading fees for a symbol from market info (public data). For authenticated users, this will try to fetch user-specific fees first, falling back to market defaults if that fails. Parameters: symbol (str, optional): The trading symbol (e.g., 'BTC/USDT'). If None, returns general exchange fees. Returns: Dict[str, Any]: A dictionary containing: - 'maker': Maker fee rate (e.g., 0.001 for 0.1%) - 'taker': Taker fee rate (e.g., 0.001 for 0.1%) - 'source': Where the fees came from ('user', 'market', or 'default') """ default_fees = {'maker': 0.001, 'taker': 0.001, 'source': 'default'} # Try to get user-specific fees if authenticated if self.configured: try: user_fees = self.client.fetch_trading_fees() if symbol and symbol in user_fees: fee_data = user_fees[symbol] return { 'maker': float(fee_data.get('maker', 0.001)), 'taker': float(fee_data.get('taker', 0.001)), 'source': 'user' } elif user_fees: # Some exchanges return a single fee structure, not per-symbol # Try to get a representative fee first_key = next(iter(user_fees), None) if first_key and isinstance(user_fees[first_key], dict): fee_data = user_fees[first_key] return { 'maker': float(fee_data.get('maker', 0.001)), 'taker': float(fee_data.get('taker', 0.001)), 'source': 'user' } except ccxt.NotSupported: logger.debug(f"fetch_trading_fees not supported by {self.exchange_id}") except ccxt.AuthenticationError: logger.warning(f"Authentication required for user-specific fees on {self.exchange_id}") except Exception as e: logger.debug(f"Could not fetch user-specific fees: {e}") # Fall back to market info (public data) if symbol and symbol in self.exchange_info: market = self.exchange_info[symbol] maker = market.get('maker') taker = market.get('taker') if maker is not None and taker is not None: return { 'maker': float(maker), 'taker': float(taker), 'source': 'market' } # Return defaults if nothing else works return default_fees def get_margin_info(self, symbol: str) -> Dict[str, Any]: """ Returns margin trading information for a symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). Returns: Dict[str, Any]: A dictionary containing: - 'margin_enabled': Whether margin trading is available - 'max_leverage': Maximum leverage (if available) - 'margin_modes': Available margin modes (cross/isolated) """ result = { 'margin_enabled': False, 'max_leverage': 1, 'margin_modes': [] } if symbol in self.exchange_info: market = self.exchange_info[symbol] result['margin_enabled'] = market.get('margin', False) # Check for leverage info in market limits if 'limits' in market and 'leverage' in market['limits']: leverage_limits = market['limits']['leverage'] max_lev = leverage_limits.get('max') result['max_leverage'] = max_lev if max_lev is not None else 1 # Check for margin mode info if market.get('margin'): result['margin_modes'].append('cross') if market.get('isolated'): result['margin_modes'].append('isolated') return result def get_order(self, symbol: str, order_id: str) -> Dict[str, Any] | None: """ Returns an order by its ID for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). order_id (str): The ID of the order. Returns: Dict[str, Any]: The order details or None on error. """ try: return self.client.fetch_order(order_id, symbol) except ccxt.BaseError as e: logger.error(f"Error fetching order {order_id} for {symbol}: {str(e)}") return None def place_order(self, symbol: str, side: str, type: str, timeInForce: str, quantity: float, price: float = None, client_order_id: str = None) -> Tuple[str, object]: """ Places an order on the exchange. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). side (str): The side of the order ('buy' or 'sell'). type (str): The type of the order ('limit' or 'market'). timeInForce (str): The time-in-force policy ('GTC', 'IOC', etc.). quantity (float): The quantity of the order. price (float, optional): The price of the order for limit orders. client_order_id (str, optional): Client-provided order ID for idempotency. Returns: Tuple[str, object]: A tuple containing the result ('Success' or 'Failure') and the order details or None. """ result, msg = self._place_order(symbol=symbol, side=side, type=type, timeInForce=timeInForce, quantity=quantity, price=price, client_order_id=client_order_id) return result, msg def _set_avail_intervals(self) -> Tuple[str, ...]: """ Sets the available intervals for OHLCV data. Returns: Tuple[str, ...]: A tuple of available intervals. """ return tuple(self.client.timeframes.keys()) def _set_precision_rule(self, symbol: str) -> None: """ Sets the precision rule for a given symbol. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). """ market_data = self.exchange_info[symbol] precision = market_data['precision']['amount'] self.symbols_n_precision[symbol] = precision def _place_order(self, symbol: str, side: str, type: str, timeInForce: str, quantity: float, price: float = None, client_order_id: str = None) -> Tuple[str, object]: """ Places an order on the exchange. Parameters: symbol (str): The trading symbol (e.g., 'BTC/USDT'). side (str): The side of the order ('buy' or 'sell'). type (str): The type of the order ('limit' or 'market'). timeInForce (str): The time-in-force policy ('GTC', 'IOC', etc.). quantity (float): The quantity of the order. price (float, optional): The price of the order for limit orders. client_order_id (str, optional): Client-provided order ID for idempotency. Returns: Tuple[str, object]: A tuple containing the result ('Success' or 'Failure') and the order details or None. """ def format_arg(value: float) -> float: precision = self.symbols_n_precision.get(symbol, 8) return float(f"{value:.{precision}f}") quantity = format_arg(quantity) if price is not None: price = format_arg(price) order_params = { 'symbol': symbol, 'type': type, 'side': side, 'amount': quantity, 'params': {} } # Only include timeInForce for non-market orders (Binance rejects it for market orders) if type != 'market' and timeInForce: order_params['params']['timeInForce'] = timeInForce if price is not None: order_params['price'] = price # Add client order ID for idempotency (exchange-specific param names) if client_order_id: # newClientOrderId for Binance, clientOrderId for many others order_params['params']['newClientOrderId'] = client_order_id order_params['params']['clientOrderId'] = client_order_id try: order = self.client.create_order(**order_params) return 'Success', order except ccxt.BaseError as e: logger.error(f"Error placing order for {symbol}: {str(e)}") return 'Failure', None def get_active_trades(self) -> List[Dict[str, Union[str, float]]]: """ Returns a list of active trades/positions. Note: This method uses fetch_positions() which is only available on futures/derivatives exchanges. For spot exchanges, this will return an empty list since spot trading doesn't have the concept of "positions" - only balances. Returns: List[Dict[str, Union[str, float]]]: A list of active trades with symbol, side, quantity, and price. """ if not self.api_key or not self.api_key_secret: return [] # Check if the exchange supports fetching positions (futures/derivatives feature) if not self.client.has.get('fetchPositions'): logger.debug(f"Exchange {self.exchange_id} does not support fetchPositions (spot exchange)") return [] try: positions = self.client.fetch_positions() formatted_trades = [] for position in positions: # Get position size - ccxt uses 'contracts' or 'contractSize', fallback to legacy 'quantity' quantity = position.get('contracts') or position.get('contractSize') or position.get('quantity', 0) if quantity is None: quantity = 0 quantity = float(quantity) # Skip positions with zero size if quantity == 0: continue # Get entry price - ccxt uses 'entryPrice', fallback to legacy 'entry_price' entry_price = position.get('entryPrice') or position.get('entry_price', 0) if entry_price is None: entry_price = 0 active_trade = { 'symbol': position['symbol'], 'side': position.get('side') or ('buy' if quantity > 0 else 'sell'), 'quantity': abs(quantity), 'price': float(entry_price) } formatted_trades.append(active_trade) return formatted_trades except ccxt.PermissionDenied as e: # Error -2015: API key doesn't have permission for this endpoint logger.debug(f"API key lacks permission for fetchPositions on {self.exchange_id}: {e}") return [] except ccxt.NotSupported as e: logger.debug(f"fetchPositions not supported on {self.exchange_id}: {e}") return [] except ccxt.BaseError as e: logger.error(f"Error fetching active trades from {self.exchange_id}: {str(e)}") return [] def get_open_orders(self) -> List[Dict[str, Union[str, float]]]: """ Returns a list of open orders. Returns: List[Dict[str, Union[str, float]]]: A list of open orders with id, symbol, side, type, quantity, price, status. """ if self.api_key and self.api_key_secret: try: open_orders = self.client.fetch_open_orders() formatted_orders = [] for order in open_orders: open_order = { 'id': order.get('id'), # Exchange order ID - critical for reconciliation 'clientOrderId': order.get('clientOrderId'), # Client order ID if available 'symbol': order['symbol'], 'side': order['side'], 'type': order.get('type', 'limit'), 'quantity': order['amount'], 'price': order.get('price'), 'status': order.get('status', 'open'), 'filled': order.get('filled', 0), 'remaining': order.get('remaining', order['amount']), 'timestamp': order.get('timestamp'), } formatted_orders.append(open_order) return formatted_orders except ccxt.BaseError as e: logger.error(f"Error fetching open orders: {str(e)}") return [] else: return []