diff --git a/CLAUDE.md b/CLAUDE.md index 41464bb..c49cc7e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,7 +58,7 @@ pytest tests/test_file.py::test_name ## Architecture -Flask web application with SocketIO for real-time communication, using eventlet for async operations. Features a Blockly-based visual strategy builder frontend. +Flask web application with SocketIO for real-time communication, using eventlet for async operations. Features a Blockly-based visual strategy builder frontend with a **unified broker abstraction** supporting backtest, paper, and live trading modes. ``` ┌─────────────────────────────────────────────────────────────────┐ @@ -70,6 +70,9 @@ Flask web application with SocketIO for real-time communication, using eventlet ├─────────┬─────────┬─────────┬─────────┬─────────┬──────────────┤ │ Users │Strategies│ Trades │Indicators│Backtester│ Exchanges │ ├─────────┴─────────┴─────────┴─────────┴─────────┴──────────────┤ +│ Broker Abstraction │ +│ (BaseBroker → Paper / Backtest / Live brokers) │ +├─────────────────────────────────────────────────────────────────┤ │ DataCache │ │ (In-memory caching + SQLite) │ └─────────────────────────────────────────────────────────────────┘ @@ -82,7 +85,10 @@ Flask web application with SocketIO for real-time communication, using eventlet | `app.py` | Flask web server, SocketIO handlers, HTTP routes | | `BrighterTrades.py` | Main application facade, coordinates all subsystems | | `Strategies.py` | Strategy CRUD operations and execution management | -| `StrategyInstance.py` | Individual strategy execution context | +| `StrategyInstance.py` | Base strategy execution context | +| `paper_strategy_instance.py` | Paper trading strategy instance | +| `backtest_strategy_instance.py` | Backtest strategy instance | +| `live_strategy_instance.py` | Live trading strategy instance with circuit breaker and position limits | | `PythonGenerator.py` | Generates Python code from Blockly JSON | | `backtesting.py` | Strategy backtesting engine (uses backtrader) | | `indicators.py` | Technical indicator calculations | @@ -94,6 +100,18 @@ Flask web application with SocketIO for real-time communication, using eventlet | `Database.py` | SQLite database operations | | `Users.py` | User authentication and session management | | `Signals.py` | Trading signal definitions and state tracking | +| `health.py` | Application health checks | +| `logging_config.py` | Centralized logging configuration | + +### Broker Module (`src/brokers/`) + +| Module | Purpose | +|--------|---------| +| `base_broker.py` | Abstract broker interface (BaseBroker) | +| `paper_broker.py` | Simulated trading with virtual balance | +| `backtest_broker.py` | Historical data backtesting | +| `live_broker.py` | Real exchange trading via ccxt | +| `factory.py` | Creates appropriate broker based on mode | ### Key Paths diff --git a/pytest.ini b/pytest.ini index 9855d94..de67734 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,3 +4,7 @@ python_files = test_*.py python_classes = Test* python_functions = test_* addopts = -v --tb=short + +markers = + live_testnet: marks tests as requiring live testnet API keys (deselect with '-m "not live_testnet"') + live_integration: marks tests as live integration tests (deselect with '-m "not live_integration"') diff --git a/src/BrighterTrades.py b/src/BrighterTrades.py index 0a5bdfe..1eeafd8 100644 --- a/src/BrighterTrades.py +++ b/src/BrighterTrades.py @@ -564,6 +564,10 @@ class BrighterTrades: mode: str, initial_balance: float = 10000.0, commission: float = 0.001, + exchange_name: str = None, + testnet: bool = True, + max_position_pct: float = 0.5, + circuit_breaker_pct: float = -0.10, ) -> dict: """ Start a strategy in the specified mode (paper or live). @@ -573,17 +577,22 @@ class BrighterTrades: :param mode: Trading mode ('paper' or 'live'). :param initial_balance: Starting balance for paper trading. :param commission: Commission rate. + :param exchange_name: Exchange name for live trading (required for live mode). + :param testnet: Use testnet for live trading (default True for safety). + :param max_position_pct: Maximum position size as % of balance for live trading. + :param circuit_breaker_pct: Drawdown % to halt trading for live trading. :return: Dictionary with success status and details. """ from brokers import TradingMode import uuid + import config # Validate mode if mode not in [TradingMode.PAPER, TradingMode.LIVE]: return {"success": False, "message": f"Invalid mode '{mode}'. Use 'paper' or 'live'."} - # Live mode currently falls back to paper for execution. - effective_mode = TradingMode.PAPER if mode == TradingMode.LIVE else mode + # For live mode, we now use LiveStrategyInstance + effective_mode = mode # Get the strategy data strategy_data = self.strategies.data_cache.get_rows_from_datacache( @@ -653,11 +662,130 @@ class BrighterTrades: except (json.JSONDecodeError, TypeError) as e: return {"success": False, "message": f"Invalid strategy components: {e}"} - # Create unique instance ID - strategy_instance_id = str(uuid.uuid4()) - # Create the strategy instance try: + # For live mode, we need to get the exchange instance FIRST + # (before creating instance ID, to use resolved exchange name) + exchange = None + actual_testnet = testnet + resolved_exchange_name = exchange_name + + if mode == TradingMode.LIVE: + # Get the user's username for exchange lookup + try: + user_name = self.users.get_username(user_id=user_id) + except Exception: + return {"success": False, "message": "Could not resolve username for exchange access."} + + # Determine which exchange to use + if not resolved_exchange_name: + # Try to get the user's active exchange + active_exchanges = self.users.get_exchanges(user_name, category='active_exchanges') + if active_exchanges: + resolved_exchange_name = active_exchanges[0] + else: + return { + "success": False, + "message": "No exchange specified and no active exchange found. Please configure an exchange." + } + + # Determine actual testnet mode (config can override to force testnet) + if config.TESTNET_MODE: + actual_testnet = True + + # Hard production gate using effective mode after config overrides. + if not actual_testnet and not config.ALLOW_LIVE_PRODUCTION: + logger.warning( + f"Production trading blocked: BRIGHTER_ALLOW_LIVE_PROD not set. " + f"User {user_id} attempted production trading." + ) + return { + "success": False, + "message": "Production trading is disabled. Set BRIGHTER_ALLOW_LIVE_PROD=true to enable." + } + + # Get the exchange instance (may not exist yet) + try: + exchange = self.exchanges.get_exchange(ename=resolved_exchange_name, uname=user_name) + except ValueError: + exchange = None # Exchange doesn't exist yet, will be created below + + # CRITICAL: Verify exchange testnet mode matches requested mode + if exchange: + # Use bool() to normalize the comparison (handles mock objects) + exchange_is_testnet = bool(getattr(exchange, 'testnet', False)) + if exchange_is_testnet != actual_testnet: + # Exchange mode mismatch - need to create new exchange with correct mode + logger.warning( + f"Exchange '{resolved_exchange_name}' is in " + f"{'testnet' if exchange_is_testnet else 'production'} mode, " + f"but requested {'testnet' if actual_testnet else 'production'}. " + f"Creating new exchange connection." + ) + # Get API keys and reconnect with correct mode + api_keys = self.users.get_api_keys(user_name, resolved_exchange_name) + self.exchanges.connect_exchange( + exchange_name=resolved_exchange_name, + user_name=user_name, + api_keys=api_keys, + testnet=actual_testnet + ) + exchange = self.exchanges.get_exchange(ename=resolved_exchange_name, uname=user_name) + + # If exchange doesn't exist or isn't configured, try to load API keys from database + if not exchange or not exchange.configured: + logger.info(f"Exchange '{resolved_exchange_name}' not configured, loading API keys from database...") + api_keys = self.users.get_api_keys(user_name, resolved_exchange_name) + if api_keys: + logger.info(f"Found API keys for {resolved_exchange_name}, reconnecting with testnet={actual_testnet}...") + success = self.exchanges.connect_exchange( + exchange_name=resolved_exchange_name, + user_name=user_name, + api_keys=api_keys, + testnet=actual_testnet + ) + if success: + exchange = self.exchanges.get_exchange(ename=resolved_exchange_name, uname=user_name) + logger.info(f"Reconnected exchange: configured={exchange.configured}, testnet={exchange.testnet}") + else: + logger.error(f"Failed to reconnect exchange '{resolved_exchange_name}'") + else: + logger.warning(f"No API keys found in database for {user_name}/{resolved_exchange_name}") + + # Check again after attempting to load keys + if not exchange or not exchange.configured: + return { + "success": False, + "message": f"Exchange '{resolved_exchange_name}' is not configured with valid API keys. " + f"Please configure your API keys in the exchange settings." + } + + # Final verification: exchange mode MUST match requested mode + exchange_is_testnet = bool(getattr(exchange, 'testnet', False)) + if exchange_is_testnet != actual_testnet: + return { + "success": False, + "message": f"Exchange mode mismatch: exchange is {'testnet' if exchange_is_testnet else 'production'}, " + f"but requested {'testnet' if actual_testnet else 'production'}." + } + + # Safety warning for production mode + if not actual_testnet: + logger.warning( + f"Starting LIVE PRODUCTION strategy '{strategy_name}' for user {user_id} " + f"on exchange '{resolved_exchange_name}'. Real money will be used!" + ) + + # Create deterministic instance ID for live mode AFTER exchange resolution + # (enables restart-safe state recovery with correct exchange name) + if mode == TradingMode.LIVE: + # Use resolved exchange name (not 'default') + testnet_suffix = 'testnet' if actual_testnet else 'prod' + strategy_instance_id = f"live:{user_id}:{strategy_id}:{resolved_exchange_name}:{testnet_suffix}" + else: + # Paper mode: random UUID since paper state is ephemeral + strategy_instance_id = str(uuid.uuid4()) + instance = self.strategies.create_strategy_instance( mode=mode, strategy_instance_id=strategy_instance_id, @@ -668,6 +796,10 @@ class BrighterTrades: initial_balance=initial_balance, commission=commission, price_provider=lambda symbol: self.exchanges.get_price(symbol), + exchange=exchange, + testnet=actual_testnet, + max_position_pct=max_position_pct, + circuit_breaker_pct=circuit_breaker_pct, ) # Store the active instance @@ -675,7 +807,7 @@ class BrighterTrades: logger.info(f"Started strategy '{strategy_name}' for user {user_id} in {mode} mode") - return { + result = { "success": True, "message": f"Strategy '{strategy_name}' started in {mode} mode.", "strategy_id": strategy_id, @@ -686,6 +818,19 @@ class BrighterTrades: "initial_balance": initial_balance, } + # Add live-specific info + if mode == TradingMode.LIVE: + result["exchange"] = resolved_exchange_name + result["testnet"] = actual_testnet + result["max_position_pct"] = max_position_pct + result["circuit_breaker_pct"] = circuit_breaker_pct + if actual_testnet: + result["warning"] = "Running in TESTNET mode. No real money at risk." + else: + result["warning"] = "PRODUCTION MODE: Real money is at risk!" + + return result + except Exception as e: logger.error(f"Failed to create strategy instance: {e}", exc_info=True) return {"success": False, "message": f"Failed to start strategy: {str(e)}"} @@ -799,6 +944,12 @@ class BrighterTrades: if hasattr(instance, 'trade_history'): status['trade_count'] = len(instance.trade_history) + # Live-specific status + if hasattr(instance, 'is_testnet'): + status['testnet'] = instance.is_testnet + if hasattr(instance, 'circuit_breaker_status'): + status['circuit_breaker'] = instance.circuit_breaker_status + running_strategies.append(status) return { @@ -853,6 +1004,10 @@ class BrighterTrades: 'message': '' } + # If no API keys provided, try to load from database + if not api_keys: + api_keys = self.users.get_api_keys(user_name, exchange_name) + try: if self.data.get_serialized_datacache(cache_name='exchange_data', filter_vals=([('user', user_name), ('name', exchange_name)])).empty: @@ -1201,11 +1356,23 @@ class BrighterTrades: initial_balance = float(msg_data.get('initial_balance', 10000.0)) commission = float(msg_data.get('commission', 0.001)) + # Live trading specific parameters + exchange_name = msg_data.get('exchange_name') or msg_data.get('exchange') + testnet = msg_data.get('testnet', True) + if isinstance(testnet, str): + testnet = testnet.lower() == 'true' + max_position_pct = float(msg_data.get('max_position_pct', 0.5)) + circuit_breaker_pct = float(msg_data.get('circuit_breaker_pct', -0.10)) + # Validate numeric ranges if initial_balance <= 0: return standard_reply("strategy_run_error", {"message": "Initial balance must be positive."}) if commission < 0 or commission > 1: return standard_reply("strategy_run_error", {"message": "Commission must be between 0 and 1."}) + if not (0 < max_position_pct <= 1): + return standard_reply("strategy_run_error", {"message": "max_position_pct must be between 0 and 1."}) + if circuit_breaker_pct >= 0: + return standard_reply("strategy_run_error", {"message": "circuit_breaker_pct must be negative (e.g., -0.10 for -10%)."}) result = self.start_strategy( user_id=user_id, @@ -1213,12 +1380,13 @@ class BrighterTrades: mode=mode, initial_balance=initial_balance, commission=commission, + exchange_name=exchange_name, + testnet=testnet, + max_position_pct=max_position_pct, + circuit_breaker_pct=circuit_breaker_pct, ) if result.get('success'): - # Add explicit warning if live mode was requested but fell back to paper - if mode == 'live' and result.get('actual_mode') == 'paper': - result['warning'] = "Live trading is not yet implemented. Running in paper trading mode for safety." return standard_reply("strategy_started", result) else: return standard_reply("strategy_run_error", result) diff --git a/src/DataCache_v3.py b/src/DataCache_v3.py index 82f8d9b..fb78a50 100644 --- a/src/DataCache_v3.py +++ b/src/DataCache_v3.py @@ -313,8 +313,8 @@ class TableBasedCache: if key is not None: df_with_metadata['tbl_key'] = key - if getattr(self, 'cache', None) is None: - # If the cache is empty, initialize it with the new DataFrame + if getattr(self, 'cache', None) is None or self.cache.empty: + # If the cache is empty or None, initialize it with the new DataFrame self.cache = df_with_metadata else: # Append the new rows @@ -965,7 +965,9 @@ class DatabaseInteractions(SnapshotDataCache): for _, row in rows.iterrows(): cache.add_entry(key=row['tbl_key'], data=row) else: - cache.add_table(rows, overwrite='tbl_key') + # Only use tbl_key as overwrite if it exists in the rows + overwrite_col = 'tbl_key' if 'tbl_key' in rows.columns else None + cache.add_table(rows, overwrite=overwrite_col) return rows @@ -990,7 +992,9 @@ class DatabaseInteractions(SnapshotDataCache): cache.add_entry(key=key_value, data=rows) else: # For table-based cache, add the entire DataFrame to the cache - cache.add_table(df=rows, overwrite='tbl_key') + # Only use tbl_key as overwrite if it exists in the rows + overwrite_col = 'tbl_key' if 'tbl_key' in rows.columns else None + cache.add_table(df=rows, overwrite=overwrite_col) # Return the fetched rows return rows diff --git a/src/Exchange.py b/src/Exchange.py index bbd2471..33faf9c 100644 --- a/src/Exchange.py +++ b/src/Exchange.py @@ -16,7 +16,8 @@ class Exchange: _market_cache = {} - def __init__(self, name: str, api_keys: Dict[str, str] | None, exchange_id: str): + def __init__(self, name: str, api_keys: Dict[str, str] | None, exchange_id: str, + testnet: bool = False): """ Initializes the Exchange object. @@ -24,12 +25,14 @@ class Exchange: name (str): The name of this exchange instance. api_keys (Dict[str, str]): Dictionary containing 'key' and 'secret' for API authentication. exchange_id (str): The ID of the exchange as recognized by CCXT. Example('binance') + testnet (bool): Whether to use testnet/sandbox mode. Defaults to False. """ self.name = name self.api_key = api_keys['key'] if api_keys else None self.api_key_secret = api_keys['secret'] if api_keys else None self.configured = False self.exchange_id = exchange_id + self.testnet = testnet self.client: ccxt.Exchange = self._connect_exchange() if self.client: self._check_authentication() @@ -51,21 +54,30 @@ class Exchange: logger.error(f"Exchange {self.exchange_id} is not supported by CCXT.") raise ValueError(f"Exchange {self.exchange_id} is not supported by CCXT.") - logger.info(f"Connecting to exchange {self.exchange_id}.") + mode_str = "testnet" if self.testnet else "production" + logger.info(f"Connecting to exchange {self.exchange_id} ({mode_str} mode).") + + config = { + 'enableRateLimit': True, + 'verbose': False, + 'options': {'warnOnFetchOpenOrdersWithoutSymbol': False} + } + if self.api_key and self.api_key_secret: - return exchange_class({ - 'apiKey': self.api_key, - 'secret': self.api_key_secret, - 'enableRateLimit': True, - 'verbose': False, - 'options': {'warnOnFetchOpenOrdersWithoutSymbol': False} - }) - else: - return exchange_class({ - 'enableRateLimit': True, - 'verbose': False, - 'options': {'warnOnFetchOpenOrdersWithoutSymbol': False} - }) + config['apiKey'] = self.api_key + config['secret'] = self.api_key_secret + + client = exchange_class(config) + + # Enable sandbox/testnet mode if requested + if self.testnet: + try: + client.set_sandbox_mode(True) + logger.info(f"Sandbox mode enabled for {self.exchange_id}") + except Exception as e: + logger.warning(f"Could not enable sandbox mode for {self.exchange_id}: {e}") + + return client def _check_authentication(self): if not (self.api_key and self.api_key_secret): @@ -402,7 +414,8 @@ class Exchange: return None def place_order(self, symbol: str, side: str, type: str, timeInForce: str, - quantity: float, price: float = None) -> Tuple[str, object]: + quantity: float, price: float = None, + client_order_id: str = None) -> Tuple[str, object]: """ Places an order on the exchange. @@ -413,12 +426,14 @@ class Exchange: timeInForce (str): The time-in-force policy ('GTC', 'IOC', etc.). quantity (float): The quantity of the order. price (float, optional): The price of the order for limit orders. + client_order_id (str, optional): Client-provided order ID for idempotency. Returns: Tuple[str, object]: A tuple containing the result ('Success' or 'Failure') and the order details or None. """ result, msg = self._place_order(symbol=symbol, side=side, type=type, - timeInForce=timeInForce, quantity=quantity, price=price) + timeInForce=timeInForce, quantity=quantity, price=price, + client_order_id=client_order_id) return result, msg def _set_avail_intervals(self) -> Tuple[str, ...]: @@ -441,8 +456,8 @@ class Exchange: precision = market_data['precision']['amount'] self.symbols_n_precision[symbol] = precision - def _place_order(self, symbol: str, side: str, type: str, timeInForce: str, quantity: float, price: float = None) -> \ - Tuple[str, object]: + def _place_order(self, symbol: str, side: str, type: str, timeInForce: str, quantity: float, + price: float = None, client_order_id: str = None) -> Tuple[str, object]: """ Places an order on the exchange. @@ -453,6 +468,7 @@ class Exchange: timeInForce (str): The time-in-force policy ('GTC', 'IOC', etc.). quantity (float): The quantity of the order. price (float, optional): The price of the order for limit orders. + client_order_id (str, optional): Client-provided order ID for idempotency. Returns: Tuple[str, object]: A tuple containing the result ('Success' or 'Failure') and the order details or None. @@ -471,14 +487,22 @@ class Exchange: 'type': type, 'side': side, 'amount': quantity, - 'params': { - 'timeInForce': timeInForce - } + 'params': {} } + # Only include timeInForce for non-market orders (Binance rejects it for market orders) + if type != 'market' and timeInForce: + order_params['params']['timeInForce'] = timeInForce + if price is not None: order_params['price'] = price + # Add client order ID for idempotency (exchange-specific param names) + if client_order_id: + # newClientOrderId for Binance, clientOrderId for many others + order_params['params']['newClientOrderId'] = client_order_id + order_params['params']['clientOrderId'] = client_order_id + try: order = self.client.create_order(**order_params) return 'Success', order @@ -517,7 +541,7 @@ class Exchange: Returns a list of open orders. Returns: - List[Dict[str, Union[str, float]]]: A list of open orders with symbol, side, quantity, and price. + List[Dict[str, Union[str, float]]]: A list of open orders with id, symbol, side, type, quantity, price, status. """ if self.api_key and self.api_key_secret: try: @@ -525,10 +549,17 @@ class Exchange: formatted_orders = [] for order in open_orders: open_order = { + 'id': order.get('id'), # Exchange order ID - critical for reconciliation + 'clientOrderId': order.get('clientOrderId'), # Client order ID if available 'symbol': order['symbol'], 'side': order['side'], + 'type': order.get('type', 'limit'), 'quantity': order['amount'], - 'price': order['price'] + 'price': order.get('price'), + 'status': order.get('status', 'open'), + 'filled': order.get('filled', 0), + 'remaining': order.get('remaining', order['amount']), + 'timestamp': order.get('timestamp'), } formatted_orders.append(open_order) return formatted_orders diff --git a/src/ExchangeInterface.py b/src/ExchangeInterface.py index 69eb908..4b49546 100644 --- a/src/ExchangeInterface.py +++ b/src/ExchangeInterface.py @@ -101,17 +101,30 @@ class ExchangeInterface: return public_list - def connect_exchange(self, exchange_name: str, user_name: str, api_keys: Dict[str, str] = None) -> bool: + def connect_exchange(self, exchange_name: str, user_name: str, api_keys: Dict[str, str] = None, + testnet: bool = False) -> bool: """ Initialize and store a reference to the specified exchange. :param exchange_name: The name of the exchange. :param user_name: The name of the user connecting the exchange. :param api_keys: Optional API keys for the exchange. + :param testnet: Whether to use testnet/sandbox mode. :return: True if successful, False otherwise. """ try: - exchange = Exchange(name=exchange_name, api_keys=api_keys, exchange_id=exchange_name.lower()) + # Remove any existing exchange entry to prevent duplicates + # (get_exchange returns first match, so duplicates cause issues) + try: + self.cache_manager.remove_row_from_datacache( + cache_name='exchange_data', + filter_vals=[('user', user_name), ('name', exchange_name)] + ) + except Exception: + pass # No existing entry to remove, that's fine + + exchange = Exchange(name=exchange_name, api_keys=api_keys, exchange_id=exchange_name.lower(), + testnet=testnet) self.add_exchange(user_name, exchange) return True except Exception as e: diff --git a/src/PythonGenerator.py b/src/PythonGenerator.py index 3ca552c..e8c4722 100644 --- a/src/PythonGenerator.py +++ b/src/PythonGenerator.py @@ -765,8 +765,9 @@ class PythonGenerator: if not option: return 'None' - # Precompile the regex pattern for market symbols (e.g., 'BTC/USD') - market_symbol_pattern = re.compile(r'^[A-Z]{3}/[A-Z]{3}$') + # Precompile the regex pattern for market symbols (e.g., 'BTC/USD', 'BTC/USDT', 'ETH/BTC') + # Matches 2-5 uppercase letters, a slash, and 2-5 more uppercase letters + market_symbol_pattern = re.compile(r'^[A-Z]{2,5}/[A-Z]{2,5}$') def is_market_symbol(value: str) -> bool: """ diff --git a/src/Strategies.py b/src/Strategies.py index c5bafb9..5920490 100644 --- a/src/Strategies.py +++ b/src/Strategies.py @@ -151,6 +151,10 @@ class Strategies: commission: float = 0.001, slippage: float = 0.0, price_provider: Any = None, + exchange: Any = None, + testnet: bool = True, + max_position_pct: float = 0.5, + circuit_breaker_pct: float = -0.10, ) -> StrategyInstance: """ Factory method to create the appropriate strategy instance based on mode. @@ -165,6 +169,10 @@ class Strategies: :param commission: Commission rate. :param slippage: Slippage rate. :param price_provider: Callable for getting current prices. + :param exchange: Exchange instance for live trading. + :param testnet: Use testnet for live trading (default True for safety). + :param max_position_pct: Maximum position size as % of balance for live trading. + :param circuit_breaker_pct: Drawdown % to halt trading for live trading. :return: Strategy instance appropriate for the mode. """ mode = mode.lower() @@ -200,10 +208,27 @@ class Strategies: ) elif mode == TradingMode.LIVE: - # Live trading not yet implemented - fall back to paper for safety - logger.warning("Live trading mode not yet implemented. Using paper trading instead.") - from paper_strategy_instance import PaperStrategyInstance - return PaperStrategyInstance( + if exchange is None: + raise ValueError( + "Live trading requires an exchange instance. " + "Please configure exchange credentials first." + ) + + from live_strategy_instance import LiveStrategyInstance + + # Safety warning for production mode + if not testnet: + logger.warning( + f"Creating LiveStrategyInstance for PRODUCTION trading. " + f"Strategy: {strategy_name}, User: {user_id}" + ) + else: + logger.info( + f"Creating LiveStrategyInstance in TESTNET mode. " + f"Strategy: {strategy_name}, User: {user_id}" + ) + + return LiveStrategyInstance( strategy_instance_id=strategy_instance_id, strategy_id=strategy_id, strategy_name=strategy_name, @@ -212,10 +237,13 @@ class Strategies: data_cache=self.data_cache, indicators=self.indicators_manager, trades=self.trades, + exchange=exchange, + testnet=testnet, initial_balance=initial_balance, commission=commission, slippage=slippage, - price_provider=price_provider, + max_position_pct=max_position_pct, + circuit_breaker_pct=circuit_breaker_pct, ) else: @@ -528,6 +556,10 @@ class Strategies: commission: float = 0.001, slippage: float = 0.0, price_provider: Any = None, + exchange: Any = None, + testnet: bool = True, + max_position_pct: float = 0.5, + circuit_breaker_pct: float = -0.10, ) -> dict[str, Any]: """ Executes a strategy based on the provided strategy data. @@ -538,6 +570,10 @@ class Strategies: :param commission: Commission rate. :param slippage: Slippage rate for market orders. :param price_provider: Callable for getting current prices (paper/live). + :param exchange: Exchange instance for live trading. + :param testnet: Use testnet for live trading (default True for safety). + :param max_position_pct: Maximum position size as % of balance for live trading. + :param circuit_breaker_pct: Drawdown % to halt trading for live trading. :return: A dictionary indicating success or failure with relevant messages. """ try: @@ -571,6 +607,10 @@ class Strategies: commission=commission, slippage=slippage, price_provider=price_provider, + exchange=exchange, + testnet=testnet, + max_position_pct=max_position_pct, + circuit_breaker_pct=circuit_breaker_pct, ) # Store in active_instances diff --git a/src/app.py b/src/app.py index b5e6dc1..b4402f0 100644 --- a/src/app.py +++ b/src/app.py @@ -57,6 +57,106 @@ def add_cors_headers(response): return response +# ============================================================================= +# Strategy Execution Loop (Background Task) +# ============================================================================= +# This runs in the background and periodically executes active live/paper strategies + +STRATEGY_LOOP_INTERVAL = 10 # seconds between strategy ticks +_strategy_loop_running = False + +def strategy_execution_loop(): + """ + Background loop that executes active strategies periodically. + + For live trading, strategies need to be triggered server-side, + not just when the client sends candle data. + """ + global _strategy_loop_running + _strategy_loop_running = True + logger = logging.getLogger('strategy_loop') + logger.info("Strategy execution loop started") + + while _strategy_loop_running: + try: + # Check if there are any active strategy instances + active_count = brighter_trades.strategies.get_active_count() + + # Log every iteration for debugging + if not hasattr(strategy_execution_loop, '_iter_count'): + strategy_execution_loop._iter_count = 0 + strategy_execution_loop._iter_count += 1 + logger.info(f"Execution loop iteration {strategy_execution_loop._iter_count}, active_count={active_count}") + + if active_count > 0: + logger.info(f"Executing {active_count} active strategies...") + + # Iterate directly over active instances + instance_keys = list(brighter_trades.strategies.active_instances.keys()) + + for instance_key in instance_keys: + try: + user_id, strategy_id, mode = instance_key + instance = brighter_trades.strategies.active_instances.get(instance_key) + + if instance is None: + continue + + # For live strategies, get current price as candle data + # Default to BTC/USDT if no symbol specified + symbol = getattr(instance, 'symbol', 'BTC/USDT') + + try: + price = brighter_trades.exchanges.get_price(symbol) + if price: + import time + candle_data = { + 'symbol': symbol, + 'close': price, + 'open': price, + 'high': price, + 'low': price, + 'volume': 0, + 'time': int(time.time()) + } + + # Execute strategy tick + events = instance.tick(candle_data) + + if events: + logger.info(f"Strategy {strategy_id} generated {len(events)} events: {events}") + # Emit events to the user's room + user_name = brighter_trades.users.get_username(user_id=user_id) + if user_name: + socketio.emit('strategy_events', { + 'strategy_id': strategy_id, + 'mode': mode, + 'events': events + }, room=user_name) + + except Exception as e: + logger.warning(f"Could not get price for {symbol}: {e}") + + except Exception as e: + logger.error(f"Error executing strategy {instance_key}: {e}", exc_info=True) + + except Exception as e: + logger.error(f"Strategy execution loop error: {e}") + + # Sleep before next iteration + eventlet.sleep(STRATEGY_LOOP_INTERVAL) + + logger.info("Strategy execution loop stopped") + + +def start_strategy_loop(): + """Start the strategy execution loop in a background greenlet.""" + eventlet.spawn(strategy_execution_loop) + + +# Start the loop when the app starts (will be called from main block) + + def _coerce_user_id(user_id): if user_id is None or user_id == '': return None @@ -391,4 +491,8 @@ def indicator_init(): if __name__ == '__main__': + # Start the strategy execution loop in the background + start_strategy_loop() + logging.info("Strategy execution loop started in background") + socketio.run(app, host='127.0.0.1', port=5002, debug=False, use_reloader=False) diff --git a/src/brokers/factory.py b/src/brokers/factory.py index 481ebe6..4d392b8 100644 --- a/src/brokers/factory.py +++ b/src/brokers/factory.py @@ -10,6 +10,7 @@ from typing import Any, Optional, Callable from .base_broker import BaseBroker from .backtest_broker import BacktestBroker from .paper_broker import PaperBroker +from .live_broker import LiveBroker logger = logging.getLogger(__name__) @@ -28,9 +29,9 @@ def create_broker( slippage: float = 0.0, price_provider: Optional[Callable[[str], float]] = None, data_cache: Any = None, - exchange_interface: Any = None, - user_name: str = None, - exchange_name: str = None, + exchange: Any = None, + testnet: bool = True, + rate_limit: float = 2.0, **kwargs ) -> BaseBroker: """ @@ -42,9 +43,9 @@ def create_broker( :param slippage: Slippage rate. :param price_provider: Callable for getting current prices (paper/live). :param data_cache: DataCache instance for persistence. - :param exchange_interface: ExchangeInterface for live trading. - :param user_name: User name for live trading. - :param exchange_name: Exchange name for live trading. + :param exchange: Exchange instance for live trading. + :param testnet: Use testnet for live trading (default True for safety). + :param rate_limit: API calls per second limit for live trading. :param kwargs: Additional arguments passed to broker constructor. :return: Broker instance. """ @@ -71,17 +72,30 @@ def create_broker( ) elif mode == TradingMode.LIVE: - # Live broker not yet implemented - fall back to paper trading with warning - logger.warning( - "Live trading broker not yet implemented. " - "Falling back to paper trading for safety." - ) - return PaperBroker( + # Verify exchange is provided for live trading + if exchange is None: + raise ValueError( + "Live trading requires an exchange instance. " + "Please provide the 'exchange' parameter." + ) + + # Safety warning for production mode + if not testnet: + logger.warning( + "Creating LiveBroker for PRODUCTION trading. " + "Real money will be used!" + ) + else: + logger.info("Creating LiveBroker in TESTNET mode") + + return LiveBroker( + exchange=exchange, + testnet=testnet, initial_balance=initial_balance, commission=commission, - slippage=slippage if slippage > 0 else 0.0005, - price_provider=price_provider, + slippage=slippage, data_cache=data_cache, + rate_limit=rate_limit, **kwargs ) @@ -93,4 +107,4 @@ def create_broker( def get_available_modes() -> list: """Get list of available trading modes.""" - return [TradingMode.BACKTEST, TradingMode.PAPER] + return [TradingMode.BACKTEST, TradingMode.PAPER, TradingMode.LIVE] diff --git a/src/brokers/live_broker.py b/src/brokers/live_broker.py index 426e7f0..50a4a13 100644 --- a/src/brokers/live_broker.py +++ b/src/brokers/live_broker.py @@ -3,13 +3,19 @@ Live Trading Broker Implementation for BrighterTrading. Executes real trades via exchange APIs (CCXT). -NOTE: This is a stub implementation. Full live trading -requires careful testing with testnet before production use. +WARNING: This broker executes real trades with real money when not in testnet mode. +Ensure thorough testing on testnet before production use. """ import logging +import time +import json +from functools import wraps from typing import Any, Dict, List, Optional +from datetime import datetime, timezone, timedelta import uuid +import hashlib +import ccxt from .base_broker import ( BaseBroker, OrderResult, OrderSide, OrderType, OrderStatus, Position @@ -18,47 +24,189 @@ from .base_broker import ( logger = logging.getLogger(__name__) +class RateLimiter: + """ + Simple rate limiter to prevent API throttling. + + Usage: + limiter = RateLimiter(calls_per_second=2.0) + limiter.wait() # Call before each API request + """ + + def __init__(self, calls_per_second: float = 2.0): + """ + Initialize the rate limiter. + + :param calls_per_second: Maximum API calls per second. + """ + self.min_interval = 1.0 / calls_per_second + self.last_call = 0.0 + + def wait(self): + """Wait if necessary to respect rate limits.""" + elapsed = time.time() - self.last_call + if elapsed < self.min_interval: + time.sleep(self.min_interval - elapsed) + self.last_call = time.time() + + +def retry_on_network_error(max_retries: int = 3, delay: float = 1.0): + """ + Decorator to retry a function on network errors. + + :param max_retries: Maximum number of retry attempts. + :param delay: Base delay between retries (exponential backoff). + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + last_exception = None + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except ccxt.NetworkError as e: + last_exception = e + if attempt == max_retries - 1: + logger.error(f"Network error after {max_retries} attempts: {e}") + raise + wait_time = delay * (2 ** attempt) + logger.warning(f"Network error, retrying in {wait_time}s: {e}") + time.sleep(wait_time) + raise last_exception + return wrapper + return decorator + + +class LiveOrder: + """Represents a live trading order.""" + + def __init__( + self, + order_id: str, + exchange_order_id: Optional[str], + symbol: str, + side: OrderSide, + order_type: OrderType, + size: float, + price: Optional[float] = None, + stop_loss: Optional[float] = None, + take_profit: Optional[float] = None, + time_in_force: str = 'GTC', + client_order_id: Optional[str] = None + ): + self.order_id = order_id + self.exchange_order_id = exchange_order_id + self.client_order_id = client_order_id + self.symbol = symbol + self.side = side + self.order_type = order_type + self.size = size + self.price = price + self.stop_loss = stop_loss + self.take_profit = take_profit + self.time_in_force = time_in_force + self.status = OrderStatus.PENDING + self.filled_qty = 0.0 + self.filled_price = 0.0 + self.commission = 0.0 + self.created_at = datetime.now(timezone.utc) + self.filled_at: Optional[datetime] = None + self.last_update: Optional[datetime] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for persistence.""" + return { + 'order_id': self.order_id, + 'exchange_order_id': self.exchange_order_id, + 'client_order_id': self.client_order_id, + 'symbol': self.symbol, + 'side': self.side.value, + 'order_type': self.order_type.value, + 'size': self.size, + 'price': self.price, + 'stop_loss': self.stop_loss, + 'take_profit': self.take_profit, + 'time_in_force': self.time_in_force, + 'status': self.status.value, + 'filled_qty': self.filled_qty, + 'filled_price': self.filled_price, + 'commission': self.commission, + 'created_at': self.created_at.isoformat(), + 'filled_at': self.filled_at.isoformat() if self.filled_at else None, + 'last_update': self.last_update.isoformat() if self.last_update else None + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'LiveOrder': + """Create LiveOrder from dictionary.""" + order = cls( + order_id=data['order_id'], + exchange_order_id=data.get('exchange_order_id'), + symbol=data['symbol'], + side=OrderSide(data['side']), + order_type=OrderType(data['order_type']), + size=data['size'], + price=data.get('price'), + stop_loss=data.get('stop_loss'), + take_profit=data.get('take_profit'), + time_in_force=data.get('time_in_force', 'GTC'), + client_order_id=data.get('client_order_id') + ) + order.status = OrderStatus(data['status']) + order.filled_qty = data.get('filled_qty', 0.0) + order.filled_price = data.get('filled_price', 0.0) + order.commission = data.get('commission', 0.0) + if data.get('created_at'): + order.created_at = datetime.fromisoformat(data['created_at']) + if data.get('filled_at'): + order.filled_at = datetime.fromisoformat(data['filled_at']) + if data.get('last_update'): + order.last_update = datetime.fromisoformat(data['last_update']) + return order + + class LiveBroker(BaseBroker): """ Live trading broker that executes real trades via CCXT. - WARNING: This broker executes real trades with real money. + WARNING: This broker executes real trades with real money when not in testnet mode. Ensure thorough testing on testnet before production use. - Features (to be implemented): + Features: - Real order execution via exchange APIs - Balance and position sync from exchange - Order lifecycle management (create, cancel, fill detection) - Restart-safe open order reconciliation + - Rate limiting and retry logic for network errors """ def __init__( self, - exchange_interface: Any = None, - user_name: str = None, - exchange_name: str = 'binance', + exchange: Any = None, testnet: bool = True, - initial_balance: float = 0.0, # Will be synced from exchange + initial_balance: float = 0.0, commission: float = 0.001, - slippage: float = 0.0 + slippage: float = 0.0, + data_cache: Any = None, + rate_limit: float = 2.0 ): """ Initialize the LiveBroker. - :param exchange_interface: ExchangeInterface instance for API access. - :param user_name: User name for exchange credentials. - :param exchange_name: Exchange to use (e.g., 'binance', 'alpaca'). + :param exchange: Exchange instance (from Exchange.py) for API access. :param testnet: Use testnet (default True for safety). - :param initial_balance: Starting balance (synced from exchange). + :param initial_balance: Starting balance (will be synced from exchange). :param commission: Commission rate. :param slippage: Slippage rate. + :param data_cache: DataCache instance for state persistence. + :param rate_limit: API calls per second limit. """ super().__init__(initial_balance, commission, slippage) - self._exchange_interface = exchange_interface - self._user_name = user_name - self._exchange_name = exchange_name + self._exchange = exchange self._testnet = testnet + self._data_cache = data_cache + self._rate_limiter = RateLimiter(calls_per_second=rate_limit) # Warn if not using testnet if not testnet: @@ -66,11 +214,33 @@ class LiveBroker(BaseBroker): "LiveBroker initialized for PRODUCTION trading. " "Real money will be used!" ) + else: + logger.info("LiveBroker initialized in TESTNET mode.") - # Placeholder for exchange connection - self._exchange = None + # Connection state self._connected = False + # Balance tracking - keyed by asset symbol + self._balances: Dict[str, float] = {} + self._locked_balances: Dict[str, float] = {} + + # Orders and positions + self._orders: Dict[str, LiveOrder] = {} + self._positions: Dict[str, Position] = {} + + # Price cache with expiration + self._current_prices: Dict[str, float] = {} + self._price_timestamps: Dict[str, datetime] = {} + self._price_cache_ttl_seconds: float = 5.0 # Cache expires after 5 seconds + # Auto-generated client IDs are reused briefly to make retries idempotent. + self._auto_client_id_window_seconds: float = 5.0 + self._auto_client_ids: Dict[str, tuple[str, datetime]] = {} + + # Last sync timestamps + self._last_balance_sync: Optional[datetime] = None + self._last_position_sync: Optional[datetime] = None + self._last_order_sync: Optional[datetime] = None + def _ensure_connected(self): """Ensure exchange connection is established.""" if not self._connected: @@ -78,45 +248,142 @@ class LiveBroker(BaseBroker): "LiveBroker not connected to exchange. " "Call connect() first." ) + if not self._exchange: + raise RuntimeError( + "LiveBroker has no exchange configured." + ) + @retry_on_network_error() def connect(self) -> bool: """ - Connect to the exchange. + Connect to the exchange and sync initial state. :return: True if connection successful. """ - raise NotImplementedError( - "LiveBroker.connect() not yet implemented. " - "Live trading requires exchange API integration." - ) + if not self._exchange: + logger.error("Cannot connect: no exchange configured") + return False + + try: + logger.info("LiveBroker connecting to exchange...") + + # Verify exchange is configured with API keys + if not self._exchange.configured: + logger.error("Exchange is not configured with valid API keys") + return False + + # Sync balance from exchange + self.sync_balance() + + # Sync open orders + self.sync_open_orders() + + # Sync positions + self.sync_positions() + + self._connected = True + logger.info("LiveBroker connected successfully") + return True + + except ccxt.AuthenticationError as e: + logger.error(f"Authentication failed: {e}") + return False + except ccxt.BaseError as e: + logger.error(f"Exchange error during connect: {e}") + return False def disconnect(self): """Disconnect from the exchange.""" self._connected = False - self._exchange = None + logger.info("LiveBroker disconnected") - def sync_balance(self) -> float: + @retry_on_network_error() + def sync_balance(self) -> Dict[str, float]: """ Sync balance from exchange. - :return: Current balance from exchange. + :return: Dict of asset balances. """ - raise NotImplementedError( - "LiveBroker.sync_balance() not yet implemented. " - "Balance sync requires exchange API integration." - ) + if not self._exchange: + return {} + self._rate_limiter.wait() + + try: + balance_data = self._exchange.client.fetch_balance() + + # Update total balances + self._balances.clear() + self._locked_balances.clear() + + for asset, data in balance_data.items(): + if isinstance(data, dict): + total = float(data.get('total', 0) or 0) + free = float(data.get('free', 0) or 0) + used = float(data.get('used', 0) or 0) + + if total > 0: + self._balances[asset] = total + self._locked_balances[asset] = used + + self._last_balance_sync = datetime.now(timezone.utc) + logger.debug(f"Balance synced: {len(self._balances)} assets") + return self._balances.copy() + + except ccxt.BaseError as e: + logger.error(f"Error syncing balance: {e}") + return {} + + @retry_on_network_error() def sync_positions(self) -> List[Position]: """ Sync positions from exchange. :return: List of current positions. """ - raise NotImplementedError( - "LiveBroker.sync_positions() not yet implemented. " - "Position sync requires exchange API integration." - ) + if not self._exchange: + return [] + self._rate_limiter.wait() + + try: + # Get active trades/positions from exchange + trades = self._exchange.get_active_trades() + + self._positions.clear() + for trade in trades: + symbol = trade['symbol'] + size = float(trade['quantity']) + entry_price = float(trade['price']) + + # Get current price for P&L calculation + current_price = self.get_current_price(symbol) + if current_price <= 0: + current_price = entry_price + + side = trade.get('side', 'buy') + if side == 'sell': + size = -size + + unrealized_pnl = (current_price - entry_price) * size + + self._positions[symbol] = Position( + symbol=symbol, + size=size, + entry_price=entry_price, + current_price=current_price, + unrealized_pnl=unrealized_pnl + ) + + self._last_position_sync = datetime.now(timezone.utc) + logger.debug(f"Positions synced: {len(self._positions)} positions") + return list(self._positions.values()) + + except ccxt.BaseError as e: + logger.error(f"Error syncing positions: {e}") + return [] + + @retry_on_network_error() def sync_open_orders(self) -> List[Dict[str, Any]]: """ Sync open orders from exchange. @@ -125,10 +392,151 @@ class LiveBroker(BaseBroker): :return: List of open orders from exchange. """ - raise NotImplementedError( - "LiveBroker.sync_open_orders() not yet implemented. " - "Order sync requires exchange API integration." + if not self._exchange: + return [] + + self._rate_limiter.wait() + + try: + exchange_orders = self._exchange.get_open_orders() + + # Build set of existing exchange order IDs to avoid duplicates + existing_exchange_ids = { + order.exchange_order_id + for order in self._orders.values() + if order.exchange_order_id + } + + synced_orders = [] + for ex_order in exchange_orders: + exchange_order_id = ex_order.get('id') + + # Skip orders without ID (shouldn't happen with fixed Exchange.get_open_orders) + if not exchange_order_id: + logger.warning(f"Skipping order without ID: {ex_order}") + continue + + exchange_order_id = str(exchange_order_id) + + # Skip if we already track this exchange order + if exchange_order_id in existing_exchange_ids: + synced_orders.append(ex_order) + continue + + # Create local order tracking + symbol = ex_order['symbol'] + side = OrderSide.BUY if ex_order['side'].lower() == 'buy' else OrderSide.SELL + order_type = OrderType.LIMIT if ex_order.get('type', 'limit').lower() == 'limit' else OrderType.MARKET + size = float(ex_order['quantity']) + price = float(ex_order.get('price', 0) or 0) + + # Use local UUID for internal tracking, but key by exchange ID + local_order_id = str(uuid.uuid4())[:8] + + order = LiveOrder( + order_id=local_order_id, + exchange_order_id=exchange_order_id, + symbol=symbol, + side=side, + order_type=order_type, + size=size, + price=price if price > 0 else None, + client_order_id=ex_order.get('clientOrderId') + ) + order.status = OrderStatus.OPEN + order.filled_qty = float(ex_order.get('filled', 0)) + + self._orders[local_order_id] = order + existing_exchange_ids.add(exchange_order_id) + logger.info(f"Synced open order from exchange: {exchange_order_id} -> {local_order_id}") + + synced_orders.append(ex_order) + + self._last_order_sync = datetime.now(timezone.utc) + logger.debug(f"Orders synced: {len(synced_orders)} open orders") + return synced_orders + + except ccxt.BaseError as e: + logger.error(f"Error syncing open orders: {e}") + return [] + + def _generate_client_order_id( + self, + symbol: str, + side: OrderSide, + order_type: OrderType, + size: float, + price: Optional[float], + nonce: str = None + ) -> str: + """ + Generate a deterministic client order ID for idempotency. + + This ensures that retried orders can be deduplicated by the exchange + if they support clientOrderId. + + IMPORTANT: The nonce MUST be provided by the caller and remain constant + across retries for true idempotency. If not provided, a warning is logged + and a timestamp-based fallback is used (which defeats idempotency). + + :param nonce: Unique nonce for this order placement attempt. MUST be constant across retries. + :return: Client order ID string. + """ + if nonce is None: + # WARNING: Without a stable nonce, retries will generate different IDs! + logger.warning("No nonce provided for client order ID - idempotency not guaranteed") + nonce = str(int(time.time() * 1000000)) + + # Create a hash of order parameters for idempotency + # The hash ensures same params + same nonce = same clientOrderId + order_data = f"{symbol}:{side.value}:{order_type.value}:{size}:{price}:{nonce}" + order_hash = hashlib.sha256(order_data.encode()).hexdigest()[:16] + return f"BT{order_hash}" + + def _get_or_create_auto_client_order_id( + self, + symbol: str, + side: OrderSide, + order_type: OrderType, + size: float, + price: Optional[float] + ) -> str: + """ + Create/reuse an auto client order ID for a short retry window. + + This keeps retries idempotent when callers do not provide a client_order_id. + """ + now = datetime.now(timezone.utc) + size_token = f"{float(size):.12g}" + price_token = "market" if price is None else f"{float(price):.12g}" + intent_key = f"{symbol}:{side.value}:{order_type.value}:{size_token}:{price_token}" + + # Drop stale intent keys. + stale_keys = [ + key for key, (_, ts) in self._auto_client_ids.items() + if (now - ts).total_seconds() > self._auto_client_id_window_seconds + ] + for key in stale_keys: + self._auto_client_ids.pop(key, None) + + cached = self._auto_client_ids.get(intent_key) + if cached is not None: + cached_id, cached_ts = cached + if (now - cached_ts).total_seconds() <= self._auto_client_id_window_seconds: + return cached_id + + # Fresh ID for this intent. A UUID salt prevents long-lived collisions. + nonce = f"{intent_key}:{uuid.uuid4().hex[:12]}" + client_id = self._generate_client_order_id( + symbol=symbol, + side=side, + order_type=order_type, + size=size, + price=price, + nonce=nonce ) + self._auto_client_ids[intent_key] = (client_id, now) + return client_id def place_order( self, @@ -139,101 +547,717 @@ class LiveBroker(BaseBroker): price: Optional[float] = None, stop_loss: Optional[float] = None, take_profit: Optional[float] = None, - time_in_force: str = 'GTC' + time_in_force: str = 'GTC', + client_order_id: Optional[str] = None ) -> OrderResult: - """Place an order on the exchange.""" + """ + Place an order on the exchange. + + NOTE: This method is NOT wrapped with @retry_on_network_error because + retrying order placement can cause duplicate orders. Instead, we use + clientOrderId for idempotency when supported by the exchange. + """ self._ensure_connected() - raise NotImplementedError( - "LiveBroker.place_order() not yet implemented. " - "Order placement requires exchange API integration." - ) + # Generate local order ID + order_id = str(uuid.uuid4())[:8] + # Generate/reuse client order ID for idempotency. + if client_order_id is None: + client_order_id = self._get_or_create_auto_client_order_id( + symbol, side, order_type, size, price + ) + + # Check if we already have an order with this client ID (duplicate detection) + for existing_order in self._orders.values(): + if hasattr(existing_order, 'client_order_id') and existing_order.client_order_id == client_order_id: + logger.warning(f"Duplicate order detected: {client_order_id}") + return OrderResult( + success=True, + order_id=existing_order.order_id, + status=existing_order.status, + filled_qty=existing_order.filled_qty, + filled_price=existing_order.filled_price, + commission=existing_order.commission, + message="Order already exists (duplicate detection)" + ) + + # Validate order + if size <= 0: + return OrderResult( + success=False, + message="Order size must be positive" + ) + + if order_type == OrderType.LIMIT and (price is None or price <= 0): + return OrderResult( + success=False, + message="Limit orders require a valid price" + ) + + self._rate_limiter.wait() + + try: + # Map order type to exchange format + ex_order_type = 'market' if order_type == OrderType.MARKET else 'limit' + ex_side = side.value # 'buy' or 'sell' + + logger.info(f"Placing {ex_order_type} order: {ex_side} {size} {symbol} @ {price or 'market'} (clientId: {client_order_id})") + + # Place order via exchange with client order ID for idempotency + # Note: Not all exchanges support clientOrderId, so we pass it as optional param + result, exchange_order = self._exchange.place_order( + symbol=symbol, + side=ex_side, + type=ex_order_type, + timeInForce=time_in_force, + quantity=size, + price=price, + client_order_id=client_order_id + ) + + if result != 'Success' or exchange_order is None: + logger.error(f"Order placement failed: {result}") + return OrderResult( + success=False, + message=f"Order placement failed: {result}" + ) + + # Create local order record + order = LiveOrder( + order_id=order_id, + exchange_order_id=str(exchange_order.get('id', '')), + symbol=symbol, + side=side, + order_type=order_type, + size=size, + price=price, + stop_loss=stop_loss, + take_profit=take_profit, + time_in_force=time_in_force, + client_order_id=client_order_id + ) + + # Parse order status from exchange response + ex_status = exchange_order.get('status', 'open').lower() + if ex_status == 'closed' or ex_status == 'filled': + order.status = OrderStatus.FILLED + order.filled_qty = float(exchange_order.get('filled', size)) + order.filled_price = float(exchange_order.get('average', price or 0)) + order.filled_at = datetime.now(timezone.utc) + + # Calculate commission + fee = exchange_order.get('fee', {}) + if fee: + order.commission = float(fee.get('cost', 0) or 0) + else: + order.commission = order.filled_qty * order.filled_price * self.commission + else: + order.status = OrderStatus.OPEN + + self._orders[order_id] = order + + logger.info(f"Order placed: {order_id} (exchange: {order.exchange_order_id}) - {order.status.value}") + + return OrderResult( + success=True, + order_id=order_id, + status=order.status, + filled_qty=order.filled_qty, + filled_price=order.filled_price, + commission=order.commission, + message=f"Order {order_id} placed successfully" + ) + + except ccxt.InsufficientFunds as e: + logger.error(f"Insufficient funds: {e}") + return OrderResult( + success=False, + message=f"Insufficient funds: {e}" + ) + except ccxt.InvalidOrder as e: + logger.error(f"Invalid order: {e}") + return OrderResult( + success=False, + message=f"Invalid order: {e}" + ) + except ccxt.BaseError as e: + logger.error(f"Exchange error placing order: {e}") + return OrderResult( + success=False, + message=f"Exchange error: {e}" + ) + + @retry_on_network_error() def cancel_order(self, order_id: str) -> bool: """Cancel an order on the exchange.""" self._ensure_connected() - raise NotImplementedError( - "LiveBroker.cancel_order() not yet implemented. " - "Order cancellation requires exchange API integration." - ) + if order_id not in self._orders: + logger.warning(f"Order {order_id} not found") + return False + order = self._orders[order_id] + + if order.status not in [OrderStatus.OPEN, OrderStatus.PENDING]: + logger.warning(f"Cannot cancel order {order_id}: status is {order.status.value}") + return False + + self._rate_limiter.wait() + + try: + # Cancel on exchange + self._exchange.client.cancel_order( + order.exchange_order_id, + order.symbol + ) + + order.status = OrderStatus.CANCELLED + order.last_update = datetime.now(timezone.utc) + + logger.info(f"Order {order_id} cancelled") + return True + + except ccxt.OrderNotFound: + logger.warning(f"Order {order_id} not found on exchange (may already be filled/cancelled)") + order.status = OrderStatus.CANCELLED + return True + except ccxt.BaseError as e: + logger.error(f"Error cancelling order {order_id}: {e}") + return False + + @retry_on_network_error() def get_order(self, order_id: str) -> Optional[Dict[str, Any]]: """Get order details from exchange.""" self._ensure_connected() - raise NotImplementedError( - "LiveBroker.get_order() not yet implemented. " - "Order retrieval requires exchange API integration." - ) + if order_id in self._orders: + order = self._orders[order_id] + + # If order is still open, refresh from exchange + if order.status == OrderStatus.OPEN and order.exchange_order_id: + self._rate_limiter.wait() + try: + ex_order = self._exchange.get_order(order.symbol, order.exchange_order_id) + if ex_order: + self._update_order_from_exchange(order, ex_order) + except ccxt.BaseError as e: + logger.warning(f"Could not refresh order {order_id}: {e}") + + return order.to_dict() + + return None + + def _update_order_from_exchange(self, order: LiveOrder, ex_order: Dict[str, Any]): + """Update local order with exchange data.""" + ex_status = ex_order.get('status', 'open').lower() + + if ex_status == 'closed' or ex_status == 'filled': + order.status = OrderStatus.FILLED + order.filled_qty = float(ex_order.get('filled', order.size)) + order.filled_price = float(ex_order.get('average', order.price or 0)) + if not order.filled_at: + order.filled_at = datetime.now(timezone.utc) + + # Update commission + fee = ex_order.get('fee', {}) + if fee: + order.commission = float(fee.get('cost', 0) or 0) + elif ex_status == 'canceled' or ex_status == 'cancelled': + order.status = OrderStatus.CANCELLED + elif ex_status == 'partially_filled': + order.status = OrderStatus.PARTIALLY_FILLED + order.filled_qty = float(ex_order.get('filled', 0)) + order.filled_price = float(ex_order.get('average', 0)) + + order.last_update = datetime.now(timezone.utc) def get_open_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]: - """Get all open orders from exchange.""" + """Get all open orders from local cache.""" self._ensure_connected() - raise NotImplementedError( - "LiveBroker.get_open_orders() not yet implemented. " - "Open order retrieval requires exchange API integration." - ) + open_orders = [] + for order in self._orders.values(): + if order.status in [OrderStatus.OPEN, OrderStatus.PENDING]: + if symbol is None or order.symbol == symbol: + open_orders.append(order.to_dict()) + + return open_orders def get_balance(self, asset: Optional[str] = None) -> float: - """Get balance from exchange.""" + """Get balance from cached balances.""" self._ensure_connected() - raise NotImplementedError( - "LiveBroker.get_balance() not yet implemented. " - "Balance retrieval requires exchange API integration." - ) + if asset: + return self._balances.get(asset, 0.0) + + # Return total balance (sum of all assets in quote currency) + # For simplicity, return USDT balance if available + for quote in ['USDT', 'USD', 'BUSD', 'USDC']: + if quote in self._balances: + return self._balances[quote] + + # Return first balance if no quote currency found + if self._balances: + return list(self._balances.values())[0] + + return 0.0 def get_available_balance(self, asset: Optional[str] = None) -> float: - """Get available balance from exchange.""" + """Get available balance (total minus locked in orders).""" self._ensure_connected() - raise NotImplementedError( - "LiveBroker.get_available_balance() not yet implemented. " - "Available balance retrieval requires exchange API integration." - ) + if asset: + total = self._balances.get(asset, 0.0) + locked = self._locked_balances.get(asset, 0.0) + return total - locked + + # Return available USDT balance if available + for quote in ['USDT', 'USD', 'BUSD', 'USDC']: + if quote in self._balances: + total = self._balances[quote] + locked = self._locked_balances.get(quote, 0.0) + return total - locked + + return 0.0 + + def get_total_equity(self, quote_asset: str = 'USDT', max_assets: int = 10) -> float: + """ + Get total equity by converting holdings to quote currency value. + + For performance, only fetches prices for the top N assets by balance. + Stablecoins are counted 1:1, fiat currencies are skipped. + + :param quote_asset: Quote currency to use for valuation (default USDT). + :param max_assets: Maximum number of non-stablecoin assets to price (default 10). + :return: Total equity in quote currency. + """ + self._ensure_connected() + + if not self._balances: + return 0.0 + + total_equity = 0.0 + + # Quote currencies that don't need conversion (stablecoins) + stablecoins = {'USDT', 'USD', 'BUSD', 'USDC', 'DAI', 'TUSD'} + + # Fiat currencies and other assets that don't trade on crypto exchanges + skip_assets = { + 'TRY', 'ZAR', 'UAH', 'BRL', 'PLN', 'ARS', 'JPY', 'MXN', 'COP', 'IDR', + 'CZK', 'EUR', 'GBP', 'AUD', 'CAD', 'CHF', 'CNY', 'HKD', 'INR', 'KRW', + 'NGN', 'PHP', 'RUB', 'SGD', 'THB', 'TWD', 'VND', 'NZD', 'SEK', 'NOK', + 'DKK', 'ILS', 'MYR', 'PKR', 'KES', 'EGP', 'CLP', 'PEN', 'AED', 'SAR', + '456', '这是测试币', + } + + # Separate stablecoins from assets needing price lookup + assets_to_price = [] + for asset, balance in self._balances.items(): + if balance <= 0: + continue + if asset in stablecoins: + total_equity += balance + elif asset not in skip_assets: + assets_to_price.append((asset, balance)) + + # Sort by balance descending and take top N + assets_to_price.sort(key=lambda x: x[1], reverse=True) + assets_to_price = assets_to_price[:max_assets] + + logger.info(f"get_total_equity: pricing top {len(assets_to_price)} of {len(self._balances)} assets") + + for asset, balance in assets_to_price: + try: + symbol = f"{asset}/{quote_asset}" + price = self.get_current_price(symbol) + if price > 0: + total_equity += balance * price + logger.debug(f" {asset}: {balance} * {price} = {balance * price}") + except Exception as e: + logger.warning(f"Error getting price for {asset}: {e}") + + logger.info(f"get_total_equity: total = {total_equity}") + return total_equity def get_position(self, symbol: str) -> Optional[Position]: - """Get position from exchange.""" + """Get position from cache.""" self._ensure_connected() - - raise NotImplementedError( - "LiveBroker.get_position() not yet implemented. " - "Position retrieval requires exchange API integration." - ) + return self._positions.get(symbol) def get_all_positions(self) -> List[Position]: - """Get all positions from exchange.""" + """Get all positions from cache.""" self._ensure_connected() + return list(self._positions.values()) - raise NotImplementedError( - "LiveBroker.get_all_positions() not yet implemented. " - "Position retrieval requires exchange API integration." - ) - + @retry_on_network_error() def get_current_price(self, symbol: str) -> float: - """Get current price from exchange.""" - self._ensure_connected() + """Get current price from exchange with cache expiration.""" + if not self._exchange: + return self._current_prices.get(symbol, 0.0) - raise NotImplementedError( - "LiveBroker.get_current_price() not yet implemented. " - "Price retrieval requires exchange API integration." - ) + # Check cache first - return cached price only if not expired + if symbol in self._current_prices and symbol in self._price_timestamps: + cache_age = (datetime.now(timezone.utc) - self._price_timestamps[symbol]).total_seconds() + if cache_age < self._price_cache_ttl_seconds: + return self._current_prices[symbol] + # Cache expired, will fetch fresh price below + + self._rate_limiter.wait() + + try: + price = self._exchange.get_price(symbol) + if price > 0: + self._current_prices[symbol] = price + self._price_timestamps[symbol] = datetime.now(timezone.utc) + return price + except ccxt.BaseError as e: + logger.warning(f"Error getting price for {symbol}: {e}") + # Return stale price as fallback if fetch fails + return self._current_prices.get(symbol, 0.0) def update(self) -> List[Dict[str, Any]]: """ Process pending orders and sync with exchange. - For live trading, this should: - 1. Check for order fills - 2. Update positions - 3. Handle partial fills - 4. Manage stop loss / take profit orders + Checks for order fills, updates positions, and emits events. + + :return: List of events (fills, updates, etc.). """ self._ensure_connected() - raise NotImplementedError( - "LiveBroker.update() not yet implemented. " - "Order update requires exchange API integration." - ) + events = [] + + # Update prices for tracked symbols + for symbol in list(self._positions.keys()) + [o.symbol for o in self._orders.values() if o.status == OrderStatus.OPEN]: + try: + price = self.get_current_price(symbol) + if price > 0: + self._current_prices[symbol] = price + except Exception: + pass + + # Check for fills on open orders + for order_id, order in list(self._orders.items()): + if order.status not in [OrderStatus.OPEN, OrderStatus.PENDING]: + continue + + if not order.exchange_order_id: + continue + + try: + self._rate_limiter.wait() + ex_order = self._exchange.get_order(order.symbol, order.exchange_order_id) + + if ex_order: + old_status = order.status + self._update_order_from_exchange(order, ex_order) + + # Emit fill event if order was filled + if order.status == OrderStatus.FILLED and old_status != OrderStatus.FILLED: + events.append({ + 'type': 'fill', + 'order_id': order_id, + 'exchange_order_id': order.exchange_order_id, + 'symbol': order.symbol, + 'side': order.side.value, + 'size': order.filled_qty, + 'filled_qty': order.filled_qty, + 'price': order.filled_price, + 'filled_price': order.filled_price, + 'commission': order.commission + }) + logger.info(f"Order filled: {order_id} - {order.side.value} {order.filled_qty} {order.symbol} @ {order.filled_price}") + + # Update position after fill + self._update_position_from_fill(order) + + except ccxt.BaseError as e: + logger.warning(f"Error checking order {order_id}: {e}") + + # Update position P&L + for symbol, position in self._positions.items(): + current_price = self._current_prices.get(symbol, position.current_price) + if current_price > 0: + position.current_price = current_price + position.unrealized_pnl = (current_price - position.entry_price) * position.size + + return events + + def _update_position_from_fill(self, order: LiveOrder): + """Update position based on a filled order.""" + symbol = order.symbol + filled_size = order.filled_qty + fill_price = order.filled_price + + if order.side == OrderSide.BUY: + if symbol in self._positions: + # Average in to existing position + pos = self._positions[symbol] + new_size = pos.size + filled_size + new_entry = (pos.entry_price * pos.size + fill_price * filled_size) / new_size + pos.size = new_size + pos.entry_price = new_entry + else: + # New position + self._positions[symbol] = Position( + symbol=symbol, + size=filled_size, + entry_price=fill_price, + current_price=fill_price, + unrealized_pnl=0.0 + ) + else: + # Sell order - reduce or close position + if symbol in self._positions: + pos = self._positions[symbol] + realized_pnl = (fill_price - pos.entry_price) * filled_size - order.commission + pos.realized_pnl += realized_pnl + pos.size -= filled_size + + if pos.size <= 0: + del self._positions[symbol] + + # ==================== State Persistence Methods ==================== + + def _ensure_persistence_cache(self) -> bool: + """Ensure the persistence table/cache exists.""" + if not self._data_cache: + return False + + try: + # Create backing DB table + if hasattr(self._data_cache, 'db') and hasattr(self._data_cache.db, 'execute_sql'): + self._data_cache.db.execute_sql( + 'CREATE TABLE IF NOT EXISTS "live_broker_states" (' + 'id INTEGER PRIMARY KEY AUTOINCREMENT, ' + 'tbl_key TEXT UNIQUE, ' + 'strategy_instance_id TEXT UNIQUE, ' + 'broker_state TEXT, ' + 'updated_at TEXT)', + [] + ) + + self._data_cache.create_cache( + name='live_broker_states', + cache_type='table', + size_limit=5000, + eviction_policy='deny', + default_expiration=timedelta(days=7), + columns=['id', 'tbl_key', 'strategy_instance_id', 'broker_state', 'updated_at'] + ) + return True + except Exception as e: + logger.error(f"LiveBroker: Error ensuring persistence cache: {e}", exc_info=True) + return False + + def to_state_dict(self) -> Dict[str, Any]: + """Serialize broker state to dictionary.""" + orders_data = {oid: o.to_dict() for oid, o in self._orders.items()} + positions_data = {sym: p.to_dict() for sym, p in self._positions.items()} + + return { + 'testnet': self._testnet, + 'balances': self._balances.copy(), + 'locked_balances': self._locked_balances.copy(), + 'orders': orders_data, + 'positions': positions_data, + 'current_prices': self._current_prices.copy(), + 'last_balance_sync': self._last_balance_sync.isoformat() if self._last_balance_sync else None, + 'last_position_sync': self._last_position_sync.isoformat() if self._last_position_sync else None, + 'last_order_sync': self._last_order_sync.isoformat() if self._last_order_sync else None, + } + + def from_state_dict(self, state: Dict[str, Any]): + """Restore broker state from dictionary.""" + if not state: + return + + # Restore balances + self._balances = state.get('balances', {}) + self._locked_balances = state.get('locked_balances', {}) + + # Restore orders + self._orders.clear() + for order_id, order_data in state.get('orders', {}).items(): + self._orders[order_id] = LiveOrder.from_dict(order_data) + + # Restore positions + self._positions.clear() + for symbol, pos_data in state.get('positions', {}).items(): + self._positions[symbol] = Position.from_dict(pos_data) + + # Restore price cache + self._current_prices = state.get('current_prices', {}) + + # Restore timestamps + if state.get('last_balance_sync'): + self._last_balance_sync = datetime.fromisoformat(state['last_balance_sync']) + if state.get('last_position_sync'): + self._last_position_sync = datetime.fromisoformat(state['last_position_sync']) + if state.get('last_order_sync'): + self._last_order_sync = datetime.fromisoformat(state['last_order_sync']) + + logger.info(f"LiveBroker: State restored - {len(self._orders)} orders, {len(self._positions)} positions") + + def save_state(self, strategy_instance_id: str) -> bool: + """Save broker state to data cache.""" + if not self._data_cache: + logger.warning("LiveBroker: No data cache available for persistence") + return False + + try: + if not self._ensure_persistence_cache(): + return False + + state_dict = self.to_state_dict() + state_json = json.dumps(state_dict) + + existing = self._data_cache.get_rows_from_datacache( + cache_name='live_broker_states', + filter_vals=[('strategy_instance_id', strategy_instance_id)] + ) + + columns = ('tbl_key', 'strategy_instance_id', 'broker_state', 'updated_at') + values = (strategy_instance_id, strategy_instance_id, state_json, datetime.now(timezone.utc).isoformat()) + + if existing.empty: + self._data_cache.insert_row_into_datacache( + cache_name='live_broker_states', + columns=columns, + values=values + ) + else: + self._data_cache.modify_datacache_item( + cache_name='live_broker_states', + filter_vals=[('strategy_instance_id', strategy_instance_id)], + field_names=columns, + new_values=values, + overwrite='strategy_instance_id' + ) + + logger.debug(f"LiveBroker: State saved for {strategy_instance_id}") + return True + + except Exception as e: + logger.error(f"LiveBroker: Error saving state: {e}", exc_info=True) + return False + + def load_state(self, strategy_instance_id: str) -> bool: + """Load broker state from data cache.""" + if not self._data_cache: + logger.warning("LiveBroker: No data cache available for persistence") + return False + + try: + if not self._ensure_persistence_cache(): + return False + + existing = self._data_cache.get_rows_from_datacache( + cache_name='live_broker_states', + filter_vals=[('strategy_instance_id', strategy_instance_id)] + ) + + if existing.empty: + logger.debug(f"LiveBroker: No saved state for {strategy_instance_id}") + return False + + state_json = existing.iloc[0].get('broker_state', '{}') + state_dict = json.loads(state_json) + self.from_state_dict(state_dict) + + logger.info(f"LiveBroker: State loaded for {strategy_instance_id}") + return True + + except Exception as e: + logger.error(f"LiveBroker: Error loading state: {e}", exc_info=True) + return False + + def reconcile_with_exchange(self) -> Dict[str, Any]: + """ + Compare persisted state with exchange reality and log discrepancies. + + Call this after load_state to ensure consistency. + + :return: Dict with reconciliation results. + """ + if not self._connected: + logger.warning("Cannot reconcile: not connected") + return {'success': False, 'error': 'Not connected'} + + results = { + 'success': True, + 'balance_changes': [], + 'order_changes': [], + 'position_changes': [] + } + + try: + # Reconcile balances + old_balances = self._balances.copy() + self.sync_balance() + + for asset, new_bal in self._balances.items(): + old_bal = old_balances.get(asset, 0) + if abs(new_bal - old_bal) > 0.00001: + results['balance_changes'].append({ + 'asset': asset, + 'old': old_bal, + 'new': new_bal, + 'diff': new_bal - old_bal + }) + logger.info(f"Balance reconciled: {asset} {old_bal} -> {new_bal}") + + # Reconcile open orders + old_open_orders = {oid: o for oid, o in self._orders.items() + if o.status in [OrderStatus.OPEN, OrderStatus.PENDING]} + + exchange_orders = self.sync_open_orders() + exchange_order_ids = {str(o.get('id')) for o in exchange_orders} + + for order_id, order in old_open_orders.items(): + if order.exchange_order_id not in exchange_order_ids: + # Order no longer on exchange - might be filled or cancelled + try: + ex_order = self._exchange.get_order(order.symbol, order.exchange_order_id) + if ex_order: + self._update_order_from_exchange(order, ex_order) + results['order_changes'].append({ + 'order_id': order_id, + 'old_status': 'open', + 'new_status': order.status.value + }) + except Exception: + order.status = OrderStatus.CANCELLED + results['order_changes'].append({ + 'order_id': order_id, + 'old_status': 'open', + 'new_status': 'cancelled (assumed)' + }) + + # Reconcile positions + old_positions = {sym: p.size for sym, p in self._positions.items()} + self.sync_positions() + + for symbol, pos in self._positions.items(): + old_size = old_positions.get(symbol, 0) + if abs(pos.size - old_size) > 0.00001: + results['position_changes'].append({ + 'symbol': symbol, + 'old_size': old_size, + 'new_size': pos.size + }) + logger.info(f"Position reconciled: {symbol} {old_size} -> {pos.size}") + + if results['balance_changes'] or results['order_changes'] or results['position_changes']: + logger.info(f"Reconciliation complete: {len(results['balance_changes'])} balance changes, " + f"{len(results['order_changes'])} order changes, " + f"{len(results['position_changes'])} position changes") + else: + logger.info("Reconciliation complete: no discrepancies found") + + return results + + except Exception as e: + logger.error(f"Error during reconciliation: {e}", exc_info=True) + return {'success': False, 'error': str(e)} diff --git a/src/live_strategy_instance.py b/src/live_strategy_instance.py new file mode 100644 index 0000000..b4cd39d --- /dev/null +++ b/src/live_strategy_instance.py @@ -0,0 +1,524 @@ +""" +Live Trading Strategy Instance for BrighterTrading. + +Extends StrategyInstance with live trading capabilities using +the LiveBroker for real order execution. + +WARNING: This module executes real trades with real money when not in testnet mode. +""" + +import logging +from typing import Any, Optional +import datetime as dt + +from StrategyInstance import StrategyInstance +from brokers import LiveBroker, OrderSide, OrderType + +logger = logging.getLogger(__name__) + + +class LiveStrategyInstance(StrategyInstance): + """ + Strategy instance for live trading mode. + + Uses LiveBroker for real order execution via exchange APIs. + Includes safety features: + - Circuit breaker to halt trading on excessive drawdown + - Position limits to prevent over-exposure + - Restart-safe order reconciliation + """ + + def __init__( + self, + strategy_instance_id: str, + strategy_id: str, + strategy_name: str, + user_id: int, + generated_code: str, + data_cache: Any, + indicators: Any | None, + trades: Any | None, + exchange: Any, + testnet: bool = True, + initial_balance: float = 0.0, + commission: float = 0.001, + slippage: float = 0.0, + max_position_pct: float = 0.5, + circuit_breaker_pct: float = -0.10, + rate_limit: float = 2.0, + ): + """ + Initialize the LiveStrategyInstance. + + :param strategy_instance_id: Unique identifier for this instance. + :param strategy_id: Strategy identifier. + :param strategy_name: Strategy name. + :param user_id: User identifier. + :param generated_code: Python code generated from Blockly. + :param data_cache: DataCache instance. + :param indicators: Indicators manager. + :param trades: Trades manager (not used directly, kept for compatibility). + :param exchange: Exchange instance for API access. + :param testnet: Use testnet (default True for safety). + :param initial_balance: Starting balance (will be synced from exchange). + :param commission: Commission rate. + :param slippage: Slippage rate (not typically used for live). + :param max_position_pct: Maximum position size as percentage of balance (0.5 = 50%). + :param circuit_breaker_pct: Drawdown percentage to trigger circuit breaker (-0.10 = -10%). + :param rate_limit: API calls per second limit. + """ + # Safety checks + if not testnet: + logger.warning( + "LiveStrategyInstance initialized for PRODUCTION trading! " + "Real money will be used. Ensure thorough testing first." + ) + else: + logger.info("LiveStrategyInstance initialized in TESTNET mode.") + + # Initialize the live broker + self.live_broker = LiveBroker( + exchange=exchange, + testnet=testnet, + initial_balance=initial_balance, + commission=commission, + slippage=slippage, + data_cache=data_cache, + rate_limit=rate_limit + ) + + # Set broker before calling parent __init__ + self.broker = self.live_broker + + # Safety parameters + self._testnet = testnet + self._exchange = exchange + self._max_position_pct = max_position_pct + self._circuit_breaker_pct = circuit_breaker_pct + self._circuit_breaker_tripped = False + self._circuit_breaker_reason = "" + + # Initialize parent (will call _initialize_or_load_context) + super().__init__( + strategy_instance_id, strategy_id, strategy_name, user_id, + generated_code, data_cache, indicators, trades + ) + + # Connect to exchange and sync state + if not self.live_broker.connect(): + logger.error("Failed to connect to exchange!") + raise RuntimeError("Failed to connect to exchange") + + # Get starting balance from exchange + self.starting_balance = self.live_broker.get_balance() + self.current_balance = self.starting_balance + self.available_balance = self.live_broker.get_available_balance() + self.available_strategy_balance = self.starting_balance + + # Get total equity for circuit breaker (includes spot holdings value) + self.starting_equity = self.live_broker.get_total_equity() + self.current_equity = self.starting_equity + + # Update exec_context with balance attributes + self.exec_context['starting_balance'] = self.starting_balance + self.exec_context['current_balance'] = self.current_balance + self.exec_context['available_balance'] = self.available_balance + self.exec_context['available_strategy_balance'] = self.available_strategy_balance + self.exec_context['starting_equity'] = self.starting_equity + self.exec_context['current_equity'] = self.current_equity + + # Load persisted broker state and reconcile with exchange + if self.live_broker.load_state(self.strategy_instance_id): + reconcile_result = self.live_broker.reconcile_with_exchange() + if reconcile_result.get('success'): + logger.info("LiveStrategyInstance state reconciled with exchange") + self._update_balances() + else: + logger.info(f"LiveStrategyInstance created with balance: {self.starting_balance}") + + def _check_circuit_breaker(self) -> bool: + """ + Check if circuit breaker should be tripped. + + Halts trading if drawdown exceeds the configured threshold. + Uses total equity (including spot holdings) for accurate drawdown calculation. + + :return: True if circuit breaker is tripped, False otherwise. + """ + if self._circuit_breaker_tripped: + return True + + if self.starting_equity <= 0: + return False + + self._update_balances() + drawdown_pct = (self.current_equity - self.starting_equity) / self.starting_equity + + if drawdown_pct < self._circuit_breaker_pct: + self._circuit_breaker_tripped = True + self._circuit_breaker_reason = ( + f"Drawdown {drawdown_pct:.2%} exceeded threshold {self._circuit_breaker_pct:.2%}" + ) + logger.warning( + f"CIRCUIT BREAKER TRIPPED for strategy {self.strategy_id}: " + f"{self._circuit_breaker_reason}" + ) + self.notify_user(f"Trading halted: {self._circuit_breaker_reason}") + return True + + return False + + def _check_position_limit(self, size: float, price: float, symbol: str) -> bool: + """ + Check if order would exceed position limits. + + :param size: Order size. + :param price: Order price. + :param symbol: Trading symbol. + :return: True if order is within limits, False otherwise. + """ + if self.starting_balance <= 0: + return True + + order_value = size * price + max_order_value = self.starting_balance * self._max_position_pct + + if order_value > max_order_value: + logger.warning( + f"Order rejected: value {order_value:.2f} exceeds position limit " + f"{max_order_value:.2f} ({self._max_position_pct:.0%} of {self.starting_balance:.2f})" + ) + self.notify_user( + f"Order rejected: size exceeds {self._max_position_pct:.0%} position limit" + ) + return False + + # Also check against available balance + available = self.live_broker.get_available_balance() + if order_value > available: + logger.warning( + f"Order rejected: value {order_value:.2f} exceeds available balance {available:.2f}" + ) + self.notify_user("Order rejected: insufficient available balance") + return False + + return True + + def trade_order( + self, + trade_type: str, + size: float, + order_type: str, + source: dict = None, + tif: str = 'GTC', + stop_loss: dict = None, + trailing_stop: dict = None, + take_profit: dict = None, + limit: dict = None, + trailing_limit: dict = None, + target_market: dict = None, + name_order: dict = None + ): + """ + Place an order via the live broker with safety checks. + + :param trade_type: 'buy' or 'sell'. + :param size: Order size. + :param order_type: 'MARKET' or 'LIMIT'. + :param source: Dict with 'symbol' key. + :param tif: Time in force ('GTC', 'IOC', 'FOK'). + :param stop_loss: Stop loss configuration. + :param take_profit: Take profit configuration. + :param limit: Limit price configuration. + :return: OrderResult or None if rejected. + """ + # Check circuit breaker first + if self._check_circuit_breaker(): + logger.warning("Order rejected: circuit breaker is tripped") + return None + + # Extract symbol + symbol = 'BTC/USDT' + if source: + symbol = source.get('symbol') or source.get('market', 'BTC/USDT') + + # Map trade_type to OrderSide + if trade_type.lower() == 'buy': + side = OrderSide.BUY + elif trade_type.lower() == 'sell': + side = OrderSide.SELL + else: + logger.error(f"Invalid trade_type '{trade_type}'. Order not executed.") + return None + + # Map order_type to OrderType and get price + order_type_upper = order_type.upper() + if order_type_upper == 'MARKET': + bt_order_type = OrderType.MARKET + price = self.get_current_price(symbol=symbol) + elif order_type_upper == 'LIMIT': + bt_order_type = OrderType.LIMIT + price = limit.get('value') if limit else self.get_current_price(symbol=symbol) + else: + bt_order_type = OrderType.MARKET + price = self.get_current_price(symbol=symbol) + + if price is None or price <= 0: + logger.error(f"Cannot get price for {symbol}") + return None + + # Check position limit (for buy orders) + if side == OrderSide.BUY: + if not self._check_position_limit(size, price, symbol): + return None + + # Extract stop loss and take profit prices + stop_loss_price = stop_loss.get('value') if stop_loss else None + take_profit_price = take_profit.get('value') if take_profit else None + + # Place the order + logger.info(f"Placing LIVE order: {trade_type.upper()} {size} {symbol} @ {order_type_upper}") + + result = self.live_broker.place_order( + symbol=symbol, + side=side, + order_type=bt_order_type, + size=size, + price=price if bt_order_type == OrderType.LIMIT else None, + stop_loss=stop_loss_price, + take_profit=take_profit_price, + time_in_force=tif + ) + + if result.success: + message = ( + f"LIVE {trade_type.upper()} order placed: {size} {symbol} @ {order_type_upper}" + ) + self.notify_user(message) + logger.info(message) + + # Track order in history + self.orders.append({ + 'order_id': result.order_id, + 'symbol': symbol, + 'side': trade_type, + 'size': size, + 'type': order_type, + 'status': result.status.value, + 'filled_qty': result.filled_qty, + 'filled_price': result.filled_price, + 'timestamp': dt.datetime.now().isoformat() + }) + + # Update balances after order + self._update_balances() + else: + logger.warning(f"Order failed: {result.message}") + self.notify_user(f"Order failed: {result.message}") + + return result + + def tick(self, candle_data: dict = None) -> list: + """ + Process one iteration of the live trading strategy. + + :param candle_data: Optional candle data dict. + :return: List of events. + """ + events = [] + + # Check circuit breaker first + if self._check_circuit_breaker(): + return [{ + 'type': 'circuit_breaker', + 'strategy_id': self.strategy_id, + 'reason': self._circuit_breaker_reason + }] + + # Skip if paused or exiting + if self.paused: + return [{'type': 'skipped', 'reason': 'paused'}] + + if self.exit: + return [{'type': 'skipped', 'reason': 'exiting'}] + + try: + # Update prices and check for fills + if candle_data: + symbol = candle_data.get('symbol', 'BTC/USDT') + price = float(candle_data.get('close', 0)) + + if price > 0: + # Update exec context with current data + self.exec_context['current_candle'] = candle_data + self.exec_context['current_price'] = price + self.exec_context['current_symbol'] = symbol + + # Process broker updates (check for fills, update positions) + broker_events = self.live_broker.update() + for event in broker_events: + if event['type'] == 'fill': + self.trade_history.append(event) + events.append({ + 'type': 'order_filled', + 'order_id': event.get('order_id'), + 'exchange_order_id': event.get('exchange_order_id'), + 'symbol': event.get('symbol'), + 'side': event.get('side'), + 'filled_qty': event.get('filled_qty'), + 'filled_price': event.get('filled_price'), + 'commission': event.get('commission'), + }) + logger.info(f"Order filled: {event}") + + # Update balance attributes + self._update_balances() + + # Execute strategy logic + result = self.execute() + + if result.get('success'): + events.append({ + 'type': 'tick_complete', + 'strategy_id': self.strategy_id, + 'balance': self.current_balance, + 'available_balance': self.available_balance, + 'positions': len(self.live_broker.get_all_positions()), + 'open_orders': len(self.live_broker.get_open_orders()), + 'trades': len(self.trade_history), + 'testnet': self._testnet, + }) + else: + events.append({ + 'type': 'error', + 'strategy_id': self.strategy_id, + 'message': result.get('message', 'Unknown error'), + }) + + except Exception as e: + logger.error(f"Error in live strategy tick: {e}", exc_info=True) + events.append({ + 'type': 'error', + 'strategy_id': self.strategy_id, + 'message': str(e), + }) + + return events + + def _update_balances(self): + """Update balance attributes from live broker.""" + try: + self.current_balance = self.live_broker.get_balance() + self.available_balance = self.live_broker.get_available_balance() + self.current_equity = self.live_broker.get_total_equity() + + self.exec_context['current_balance'] = self.current_balance + self.exec_context['available_balance'] = self.available_balance + self.exec_context['current_equity'] = self.current_equity + except Exception as e: + logger.warning(f"Error updating balances: {e}") + + def get_current_price(self, timeframe: str = '1h', exchange: str = 'binance', + symbol: str = 'BTC/USDT') -> float: + """Get current price from live broker.""" + try: + return self.live_broker.get_current_price(symbol) + except Exception as e: + logger.warning(f"Error getting price for {symbol}: {e}") + return 0.0 + + def get_available_balance(self) -> float: + """Get available balance from live broker.""" + self.available_balance = self.live_broker.get_available_balance() + self.exec_context['available_balance'] = self.available_balance + return self.available_balance + + def get_current_balance(self) -> float: + """Get current total balance from live broker.""" + self.current_balance = self.live_broker.get_balance() + self.exec_context['current_balance'] = self.current_balance + return self.current_balance + + def get_starting_balance(self) -> float: + """Get starting balance.""" + return self.starting_balance + + def get_active_trades(self) -> int: + """Get number of active positions.""" + return len(self.live_broker.get_all_positions()) + + def get_filled_orders(self) -> int: + """Get number of filled orders.""" + from brokers.base_broker import OrderStatus + return len([o for o in self.live_broker._orders.values() + if o.status == OrderStatus.FILLED]) + + def get_position(self, symbol: str): + """Get position for a symbol.""" + return self.live_broker.get_position(symbol) + + def close_position(self, symbol: str): + """Close a position.""" + if self._circuit_breaker_tripped: + logger.warning("Cannot close position: circuit breaker is tripped") + return None + return self.live_broker.close_position(symbol) + + def close_all_positions(self): + """Close all positions (allowed even with circuit breaker).""" + return self.live_broker.close_all_positions() + + def get_trade_history(self): + """Get all executed trades.""" + return self.trade_history.copy() + + def reset_circuit_breaker(self): + """ + Reset the circuit breaker (should be called manually after investigation). + + WARNING: Use with caution - ensure the cause of the drawdown is understood. + """ + if self._circuit_breaker_tripped: + logger.warning( + f"Circuit breaker reset for strategy {self.strategy_id}. " + f"Previous reason: {self._circuit_breaker_reason}" + ) + self._circuit_breaker_tripped = False + self._circuit_breaker_reason = "" + self.notify_user("Circuit breaker reset - trading can resume") + + def save_context(self): + """Save strategy context including live broker state.""" + self._update_balances() + # Save live broker state + self.live_broker.save_state(self.strategy_instance_id) + super().save_context() + + def disconnect(self): + """Disconnect from the exchange.""" + self.live_broker.disconnect() + logger.info(f"LiveStrategyInstance {self.strategy_id} disconnected") + + def notify_user(self, message: str): + """Send notification to user.""" + mode = "TESTNET" if self._testnet else "LIVE" + logger.info(f"[{mode}] {message}") + # Could emit via SocketIO if available + + @property + def is_testnet(self) -> bool: + """Return whether running in testnet mode.""" + return self._testnet + + @property + def circuit_breaker_status(self) -> dict: + """Get circuit breaker status.""" + return { + 'tripped': self._circuit_breaker_tripped, + 'reason': self._circuit_breaker_reason, + 'threshold': self._circuit_breaker_pct, + 'current_drawdown': ( + (self.current_equity - self.starting_equity) / self.starting_equity + if self.starting_equity > 0 else 0 + ) + } diff --git a/src/static/Strategies.js b/src/static/Strategies.js index 9ea6d5e..02b1695 100644 --- a/src/static/Strategies.js +++ b/src/static/Strategies.js @@ -114,6 +114,7 @@ class StratUIManager { try { const strategyItem = document.createElement('div'); strategyItem.className = 'strategy-item'; + strategyItem.setAttribute('data-strategy-id', strat.tbl_key); // Check if strategy is running const isRunning = UI.strats && UI.strats.isStrategyRunning(strat.tbl_key); @@ -182,9 +183,21 @@ class StratUIManager { // Show running status if applicable if (isRunning) { + let modeDisplay = runningInfo.mode; + let modeBadge = ''; + + // Add testnet/production badge for live mode + if (runningInfo.mode === 'live') { + if (runningInfo.testnet) { + modeBadge = 'TESTNET'; + } else { + modeBadge = 'PRODUCTION'; + } + } + let statusHtml = `