diff --git a/src/brokers/__init__.py b/src/brokers/__init__.py index ce1da25..d7d4272 100644 --- a/src/brokers/__init__.py +++ b/src/brokers/__init__.py @@ -10,10 +10,11 @@ This package provides a unified interface for executing trades across different from .base_broker import BaseBroker, OrderSide, OrderType, OrderStatus, OrderResult, Position from .backtest_broker import BacktestBroker from .paper_broker import PaperBroker +from .live_broker import LiveBroker from .factory import create_broker, TradingMode, get_available_modes __all__ = [ 'BaseBroker', 'OrderSide', 'OrderType', 'OrderStatus', 'OrderResult', 'Position', - 'BacktestBroker', 'PaperBroker', + 'BacktestBroker', 'PaperBroker', 'LiveBroker', 'create_broker', 'TradingMode', 'get_available_modes' ] diff --git a/src/brokers/live_broker.py b/src/brokers/live_broker.py new file mode 100644 index 0000000..426e7f0 --- /dev/null +++ b/src/brokers/live_broker.py @@ -0,0 +1,239 @@ +""" +Live Trading Broker Implementation for BrighterTrading. + +Executes real trades via exchange APIs (CCXT). + +NOTE: This is a stub implementation. Full live trading +requires careful testing with testnet before production use. +""" + +import logging +from typing import Any, Dict, List, Optional +import uuid + +from .base_broker import ( + BaseBroker, OrderResult, OrderSide, OrderType, OrderStatus, Position +) + +logger = logging.getLogger(__name__) + + +class LiveBroker(BaseBroker): + """ + Live trading broker that executes real trades via CCXT. + + WARNING: This broker executes real trades with real money. + Ensure thorough testing on testnet before production use. + + Features (to be implemented): + - Real order execution via exchange APIs + - Balance and position sync from exchange + - Order lifecycle management (create, cancel, fill detection) + - Restart-safe open order reconciliation + """ + + def __init__( + self, + exchange_interface: Any = None, + user_name: str = None, + exchange_name: str = 'binance', + testnet: bool = True, + initial_balance: float = 0.0, # Will be synced from exchange + commission: float = 0.001, + slippage: float = 0.0 + ): + """ + Initialize the LiveBroker. + + :param exchange_interface: ExchangeInterface instance for API access. + :param user_name: User name for exchange credentials. + :param exchange_name: Exchange to use (e.g., 'binance', 'alpaca'). + :param testnet: Use testnet (default True for safety). + :param initial_balance: Starting balance (synced from exchange). + :param commission: Commission rate. + :param slippage: Slippage rate. + """ + super().__init__(initial_balance, commission, slippage) + + self._exchange_interface = exchange_interface + self._user_name = user_name + self._exchange_name = exchange_name + self._testnet = testnet + + # Warn if not using testnet + if not testnet: + logger.warning( + "LiveBroker initialized for PRODUCTION trading. " + "Real money will be used!" + ) + + # Placeholder for exchange connection + self._exchange = None + self._connected = False + + def _ensure_connected(self): + """Ensure exchange connection is established.""" + if not self._connected: + raise RuntimeError( + "LiveBroker not connected to exchange. " + "Call connect() first." + ) + + def connect(self) -> bool: + """ + Connect to the exchange. + + :return: True if connection successful. + """ + raise NotImplementedError( + "LiveBroker.connect() not yet implemented. " + "Live trading requires exchange API integration." + ) + + def disconnect(self): + """Disconnect from the exchange.""" + self._connected = False + self._exchange = None + + def sync_balance(self) -> float: + """ + Sync balance from exchange. + + :return: Current balance from exchange. + """ + raise NotImplementedError( + "LiveBroker.sync_balance() not yet implemented. " + "Balance sync requires exchange API integration." + ) + + def sync_positions(self) -> List[Position]: + """ + Sync positions from exchange. + + :return: List of current positions. + """ + raise NotImplementedError( + "LiveBroker.sync_positions() not yet implemented. " + "Position sync requires exchange API integration." + ) + + def sync_open_orders(self) -> List[Dict[str, Any]]: + """ + Sync open orders from exchange. + + Used for restart-safe order reconciliation. + + :return: List of open orders from exchange. + """ + raise NotImplementedError( + "LiveBroker.sync_open_orders() not yet implemented. " + "Order sync requires exchange API integration." + ) + + def place_order( + self, + symbol: str, + side: OrderSide, + order_type: OrderType, + size: float, + price: Optional[float] = None, + stop_loss: Optional[float] = None, + take_profit: Optional[float] = None, + time_in_force: str = 'GTC' + ) -> OrderResult: + """Place an order on the exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.place_order() not yet implemented. " + "Order placement requires exchange API integration." + ) + + def cancel_order(self, order_id: str) -> bool: + """Cancel an order on the exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.cancel_order() not yet implemented. " + "Order cancellation requires exchange API integration." + ) + + def get_order(self, order_id: str) -> Optional[Dict[str, Any]]: + """Get order details from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_order() not yet implemented. " + "Order retrieval requires exchange API integration." + ) + + def get_open_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]: + """Get all open orders from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_open_orders() not yet implemented. " + "Open order retrieval requires exchange API integration." + ) + + def get_balance(self, asset: Optional[str] = None) -> float: + """Get balance from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_balance() not yet implemented. " + "Balance retrieval requires exchange API integration." + ) + + def get_available_balance(self, asset: Optional[str] = None) -> float: + """Get available balance from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_available_balance() not yet implemented. " + "Available balance retrieval requires exchange API integration." + ) + + def get_position(self, symbol: str) -> Optional[Position]: + """Get position from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_position() not yet implemented. " + "Position retrieval requires exchange API integration." + ) + + def get_all_positions(self) -> List[Position]: + """Get all positions from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_all_positions() not yet implemented. " + "Position retrieval requires exchange API integration." + ) + + def get_current_price(self, symbol: str) -> float: + """Get current price from exchange.""" + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.get_current_price() not yet implemented. " + "Price retrieval requires exchange API integration." + ) + + def update(self) -> List[Dict[str, Any]]: + """ + Process pending orders and sync with exchange. + + For live trading, this should: + 1. Check for order fills + 2. Update positions + 3. Handle partial fills + 4. Manage stop loss / take profit orders + """ + self._ensure_connected() + + raise NotImplementedError( + "LiveBroker.update() not yet implemented. " + "Order update requires exchange API integration." + ) diff --git a/src/health.py b/src/health.py new file mode 100644 index 0000000..2e8b95e --- /dev/null +++ b/src/health.py @@ -0,0 +1,205 @@ +""" +Health Check Module for BrighterTrading. + +Provides health check endpoints and monitoring capabilities. +""" + +import logging +from typing import Dict, Any, List, Optional +from datetime import datetime +import time + +logger = logging.getLogger(__name__) + + +class HealthStatus: + """Health status constants.""" + HEALTHY = 'healthy' + DEGRADED = 'degraded' + UNHEALTHY = 'unhealthy' + + +class HealthCheck: + """ + Health check coordinator for monitoring system health. + """ + + def __init__(self): + self._checks: Dict[str, callable] = {} + self._last_check_time: Optional[datetime] = None + self._cached_status: Optional[Dict[str, Any]] = None + self._cache_ttl_seconds: float = 5.0 + + def register_check(self, name: str, check_fn: callable) -> None: + """ + Register a health check function. + + :param name: Check name. + :param check_fn: Function that returns (status, message) tuple. + """ + self._checks[name] = check_fn + logger.debug(f"Registered health check: {name}") + + def unregister_check(self, name: str) -> None: + """Unregister a health check.""" + if name in self._checks: + del self._checks[name] + + def run_checks(self, use_cache: bool = True) -> Dict[str, Any]: + """ + Run all health checks and return aggregated status. + + :param use_cache: Use cached results if available and fresh. + :return: Health status dictionary. + """ + # Check cache + if use_cache and self._cached_status and self._last_check_time: + age = (datetime.utcnow() - self._last_check_time).total_seconds() + if age < self._cache_ttl_seconds: + return self._cached_status + + start_time = time.time() + results: Dict[str, Dict[str, Any]] = {} + overall_status = HealthStatus.HEALTHY + + for name, check_fn in self._checks.items(): + try: + check_start = time.time() + status, message = check_fn() + check_duration = time.time() - check_start + + results[name] = { + 'status': status, + 'message': message, + 'duration_ms': round(check_duration * 1000, 2), + } + + # Downgrade overall status if needed + if status == HealthStatus.UNHEALTHY: + overall_status = HealthStatus.UNHEALTHY + elif status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: + overall_status = HealthStatus.DEGRADED + + except Exception as e: + logger.error(f"Health check '{name}' failed with error: {e}") + results[name] = { + 'status': HealthStatus.UNHEALTHY, + 'message': str(e), + 'error': True, + } + overall_status = HealthStatus.UNHEALTHY + + total_duration = time.time() - start_time + + status = { + 'status': overall_status, + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'duration_ms': round(total_duration * 1000, 2), + 'checks': results, + } + + # Cache results + self._cached_status = status + self._last_check_time = datetime.utcnow() + + return status + + def get_status(self) -> str: + """Get simple status string.""" + result = self.run_checks() + return result['status'] + + def is_healthy(self) -> bool: + """Check if system is healthy.""" + return self.get_status() == HealthStatus.HEALTHY + + +# Default health check instance +_health_check = HealthCheck() + + +def get_health_check() -> HealthCheck: + """Get the global health check instance.""" + return _health_check + + +def register_default_checks( + data_cache: Any = None, + exchange_interface: Any = None, +) -> None: + """ + Register default health checks. + + :param data_cache: DataCache instance. + :param exchange_interface: ExchangeInterface instance. + """ + health = get_health_check() + + # Database check + if data_cache: + def check_database(): + try: + # Try to query something simple + data_cache.get_rows_from_datacache('strategies', []) + return HealthStatus.HEALTHY, 'Database accessible' + except Exception as e: + return HealthStatus.UNHEALTHY, str(e) + + health.register_check('database', check_database) + + # Exchange connectivity check + if exchange_interface: + def check_exchange(): + try: + # Try to connect to default exchange + if exchange_interface.default_exchange: + return HealthStatus.HEALTHY, 'Exchange connected' + return HealthStatus.DEGRADED, 'No default exchange' + except Exception as e: + return HealthStatus.UNHEALTHY, str(e) + + health.register_check('exchange', check_exchange) + + # Memory check + def check_memory(): + try: + import psutil + memory = psutil.virtual_memory() + if memory.percent > 90: + return HealthStatus.UNHEALTHY, f'Memory usage critical: {memory.percent}%' + elif memory.percent > 75: + return HealthStatus.DEGRADED, f'Memory usage high: {memory.percent}%' + return HealthStatus.HEALTHY, f'Memory usage: {memory.percent}%' + except ImportError: + return HealthStatus.HEALTHY, 'Memory check unavailable (psutil not installed)' + except Exception as e: + return HealthStatus.DEGRADED, str(e) + + health.register_check('memory', check_memory) + + # Heartbeat check (always healthy if reached) + def check_heartbeat(): + return HealthStatus.HEALTHY, 'Service alive' + + health.register_check('heartbeat', check_heartbeat) + + logger.info("Default health checks registered") + + +def health_endpoint() -> Dict[str, Any]: + """ + Flask/FastAPI compatible health endpoint function. + + Returns health status as JSON-serializable dict. + """ + return get_health_check().run_checks() + + +def liveness_probe() -> bool: + """Simple liveness probe for Kubernetes.""" + return True + + +def readiness_probe() -> bool: + """Readiness probe checking if service is ready to accept traffic.""" + return get_health_check().is_healthy() diff --git a/src/logging_config.py b/src/logging_config.py new file mode 100644 index 0000000..d837205 --- /dev/null +++ b/src/logging_config.py @@ -0,0 +1,276 @@ +""" +Logging Configuration for BrighterTrading. + +Provides structured logging with consistent formatting across all modules. +""" + +import logging +import sys +from typing import Optional, Dict, Any +from datetime import datetime +import json + + +class StructuredFormatter(logging.Formatter): + """ + Formatter that outputs structured JSON logs for production use. + """ + + def format(self, record: logging.LogRecord) -> str: + log_record = { + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'level': record.levelname, + 'logger': record.name, + 'message': record.getMessage(), + 'module': record.module, + 'function': record.funcName, + 'line': record.lineno, + } + + # Add exception info if present + if record.exc_info: + log_record['exception'] = self.formatException(record.exc_info) + + # Add extra fields if present + if hasattr(record, 'extra_data'): + log_record.update(record.extra_data) + + return json.dumps(log_record) + + +class ColoredFormatter(logging.Formatter): + """ + Formatter with colored output for development use. + """ + + COLORS = { + 'DEBUG': '\033[36m', # Cyan + 'INFO': '\033[32m', # Green + 'WARNING': '\033[33m', # Yellow + 'ERROR': '\033[31m', # Red + 'CRITICAL': '\033[35m', # Magenta + } + RESET = '\033[0m' + + def format(self, record: logging.LogRecord) -> str: + color = self.COLORS.get(record.levelname, self.RESET) + formatted = super().format(record) + return f"{color}{formatted}{self.RESET}" + + +def configure_logging( + level: str = 'INFO', + structured: bool = False, + log_file: Optional[str] = None, +) -> None: + """ + Configure logging for the application. + + :param level: Log level ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'). + :param structured: Use structured JSON logging (for production). + :param log_file: Optional log file path. + """ + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, level.upper())) + + # Remove existing handlers + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + + if structured: + console_handler.setFormatter(StructuredFormatter()) + else: + console_handler.setFormatter(ColoredFormatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + )) + + root_logger.addHandler(console_handler) + + # File handler if specified + if log_file: + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(StructuredFormatter()) + root_logger.addHandler(file_handler) + + +def get_logger(name: str) -> logging.Logger: + """ + Get a logger with the given name. + + :param name: Logger name (typically __name__). + :return: Logger instance. + """ + return logging.getLogger(name) + + +class TradingLogger: + """ + Specialized logger for trading operations with structured fields. + """ + + def __init__(self, logger: logging.Logger): + self._logger = logger + + def _log_with_context( + self, + level: int, + message: str, + user_id: Optional[int] = None, + strategy_id: Optional[str] = None, + order_id: Optional[str] = None, + symbol: Optional[str] = None, + **kwargs + ) -> None: + """Log message with trading context.""" + extra = { + 'user_id': user_id, + 'strategy_id': strategy_id, + 'order_id': order_id, + 'symbol': symbol, + } + extra.update(kwargs) + + # Filter out None values + extra = {k: v for k, v in extra.items() if v is not None} + + record = self._logger.makeRecord( + self._logger.name, + level, + '', + 0, + message, + (), + None, + ) + record.extra_data = extra + self._logger.handle(record) + + def order_placed( + self, + user_id: int, + strategy_id: str, + order_id: str, + symbol: str, + side: str, + size: float, + price: Optional[float] = None, + order_type: str = 'market', + ) -> None: + """Log order placement.""" + self._log_with_context( + logging.INFO, + f"Order placed: {side} {size} {symbol}", + user_id=user_id, + strategy_id=strategy_id, + order_id=order_id, + symbol=symbol, + side=side, + size=size, + price=price, + order_type=order_type, + ) + + def order_filled( + self, + user_id: int, + strategy_id: str, + order_id: str, + symbol: str, + side: str, + size: float, + price: float, + commission: float = 0.0, + ) -> None: + """Log order fill.""" + self._log_with_context( + logging.INFO, + f"Order filled: {side} {size} {symbol} @ {price}", + user_id=user_id, + strategy_id=strategy_id, + order_id=order_id, + symbol=symbol, + side=side, + size=size, + fill_price=price, + commission=commission, + ) + + def order_cancelled( + self, + user_id: int, + strategy_id: str, + order_id: str, + reason: Optional[str] = None, + ) -> None: + """Log order cancellation.""" + self._log_with_context( + logging.INFO, + f"Order cancelled: {order_id}", + user_id=user_id, + strategy_id=strategy_id, + order_id=order_id, + cancel_reason=reason, + ) + + def strategy_started( + self, + user_id: int, + strategy_id: str, + strategy_name: str, + mode: str, + ) -> None: + """Log strategy start.""" + self._log_with_context( + logging.INFO, + f"Strategy started: {strategy_name} ({mode} mode)", + user_id=user_id, + strategy_id=strategy_id, + strategy_name=strategy_name, + mode=mode, + ) + + def strategy_stopped( + self, + user_id: int, + strategy_id: str, + reason: Optional[str] = None, + ) -> None: + """Log strategy stop.""" + self._log_with_context( + logging.INFO, + f"Strategy stopped: {strategy_id}", + user_id=user_id, + strategy_id=strategy_id, + stop_reason=reason, + ) + + def error( + self, + message: str, + user_id: Optional[int] = None, + strategy_id: Optional[str] = None, + error_type: Optional[str] = None, + **kwargs + ) -> None: + """Log error.""" + self._log_with_context( + logging.ERROR, + message, + user_id=user_id, + strategy_id=strategy_id, + error_type=error_type, + **kwargs + ) + + +def get_trading_logger(name: str) -> TradingLogger: + """ + Get a trading-specific logger. + + :param name: Logger name. + :return: TradingLogger instance. + """ + return TradingLogger(logging.getLogger(name))