Phase 5-6: Live trading stubs and observability

Phase 5 - Live Trading (stub implementation):
- Create LiveBroker stub with NotImplementedError for all methods
- Document required exchange API integration points
- Add testnet flag for safety

Phase 6 - Observability:
- Add structured logging with StructuredFormatter and ColoredFormatter
- Create TradingLogger for trading-specific log entries
- Implement health check system with HealthCheck class
- Add default health checks for database, exchange, memory
- Create health_endpoint() for monitoring integration

The LiveBroker is a stub that needs exchange API integration
for production use. All other trading modes (backtest, paper)
are fully functional.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2026-02-28 17:09:54 -04:00
parent 51ec74175d
commit 5dbda13924
4 changed files with 722 additions and 1 deletions

View File

@ -10,10 +10,11 @@ This package provides a unified interface for executing trades across different
from .base_broker import BaseBroker, OrderSide, OrderType, OrderStatus, OrderResult, Position
from .backtest_broker import BacktestBroker
from .paper_broker import PaperBroker
from .live_broker import LiveBroker
from .factory import create_broker, TradingMode, get_available_modes
__all__ = [
'BaseBroker', 'OrderSide', 'OrderType', 'OrderStatus', 'OrderResult', 'Position',
'BacktestBroker', 'PaperBroker',
'BacktestBroker', 'PaperBroker', 'LiveBroker',
'create_broker', 'TradingMode', 'get_available_modes'
]

239
src/brokers/live_broker.py Normal file
View File

@ -0,0 +1,239 @@
"""
Live Trading Broker Implementation for BrighterTrading.
Executes real trades via exchange APIs (CCXT).
NOTE: This is a stub implementation. Full live trading
requires careful testing with testnet before production use.
"""
import logging
from typing import Any, Dict, List, Optional
import uuid
from .base_broker import (
BaseBroker, OrderResult, OrderSide, OrderType, OrderStatus, Position
)
logger = logging.getLogger(__name__)
class LiveBroker(BaseBroker):
"""
Live trading broker that executes real trades via CCXT.
WARNING: This broker executes real trades with real money.
Ensure thorough testing on testnet before production use.
Features (to be implemented):
- Real order execution via exchange APIs
- Balance and position sync from exchange
- Order lifecycle management (create, cancel, fill detection)
- Restart-safe open order reconciliation
"""
def __init__(
self,
exchange_interface: Any = None,
user_name: str = None,
exchange_name: str = 'binance',
testnet: bool = True,
initial_balance: float = 0.0, # Will be synced from exchange
commission: float = 0.001,
slippage: float = 0.0
):
"""
Initialize the LiveBroker.
:param exchange_interface: ExchangeInterface instance for API access.
:param user_name: User name for exchange credentials.
:param exchange_name: Exchange to use (e.g., 'binance', 'alpaca').
:param testnet: Use testnet (default True for safety).
:param initial_balance: Starting balance (synced from exchange).
:param commission: Commission rate.
:param slippage: Slippage rate.
"""
super().__init__(initial_balance, commission, slippage)
self._exchange_interface = exchange_interface
self._user_name = user_name
self._exchange_name = exchange_name
self._testnet = testnet
# Warn if not using testnet
if not testnet:
logger.warning(
"LiveBroker initialized for PRODUCTION trading. "
"Real money will be used!"
)
# Placeholder for exchange connection
self._exchange = None
self._connected = False
def _ensure_connected(self):
"""Ensure exchange connection is established."""
if not self._connected:
raise RuntimeError(
"LiveBroker not connected to exchange. "
"Call connect() first."
)
def connect(self) -> bool:
"""
Connect to the exchange.
:return: True if connection successful.
"""
raise NotImplementedError(
"LiveBroker.connect() not yet implemented. "
"Live trading requires exchange API integration."
)
def disconnect(self):
"""Disconnect from the exchange."""
self._connected = False
self._exchange = None
def sync_balance(self) -> float:
"""
Sync balance from exchange.
:return: Current balance from exchange.
"""
raise NotImplementedError(
"LiveBroker.sync_balance() not yet implemented. "
"Balance sync requires exchange API integration."
)
def sync_positions(self) -> List[Position]:
"""
Sync positions from exchange.
:return: List of current positions.
"""
raise NotImplementedError(
"LiveBroker.sync_positions() not yet implemented. "
"Position sync requires exchange API integration."
)
def sync_open_orders(self) -> List[Dict[str, Any]]:
"""
Sync open orders from exchange.
Used for restart-safe order reconciliation.
:return: List of open orders from exchange.
"""
raise NotImplementedError(
"LiveBroker.sync_open_orders() not yet implemented. "
"Order sync requires exchange API integration."
)
def place_order(
self,
symbol: str,
side: OrderSide,
order_type: OrderType,
size: float,
price: Optional[float] = None,
stop_loss: Optional[float] = None,
take_profit: Optional[float] = None,
time_in_force: str = 'GTC'
) -> OrderResult:
"""Place an order on the exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.place_order() not yet implemented. "
"Order placement requires exchange API integration."
)
def cancel_order(self, order_id: str) -> bool:
"""Cancel an order on the exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.cancel_order() not yet implemented. "
"Order cancellation requires exchange API integration."
)
def get_order(self, order_id: str) -> Optional[Dict[str, Any]]:
"""Get order details from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_order() not yet implemented. "
"Order retrieval requires exchange API integration."
)
def get_open_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all open orders from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_open_orders() not yet implemented. "
"Open order retrieval requires exchange API integration."
)
def get_balance(self, asset: Optional[str] = None) -> float:
"""Get balance from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_balance() not yet implemented. "
"Balance retrieval requires exchange API integration."
)
def get_available_balance(self, asset: Optional[str] = None) -> float:
"""Get available balance from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_available_balance() not yet implemented. "
"Available balance retrieval requires exchange API integration."
)
def get_position(self, symbol: str) -> Optional[Position]:
"""Get position from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_position() not yet implemented. "
"Position retrieval requires exchange API integration."
)
def get_all_positions(self) -> List[Position]:
"""Get all positions from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_all_positions() not yet implemented. "
"Position retrieval requires exchange API integration."
)
def get_current_price(self, symbol: str) -> float:
"""Get current price from exchange."""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.get_current_price() not yet implemented. "
"Price retrieval requires exchange API integration."
)
def update(self) -> List[Dict[str, Any]]:
"""
Process pending orders and sync with exchange.
For live trading, this should:
1. Check for order fills
2. Update positions
3. Handle partial fills
4. Manage stop loss / take profit orders
"""
self._ensure_connected()
raise NotImplementedError(
"LiveBroker.update() not yet implemented. "
"Order update requires exchange API integration."
)

205
src/health.py Normal file
View File

@ -0,0 +1,205 @@
"""
Health Check Module for BrighterTrading.
Provides health check endpoints and monitoring capabilities.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
import time
logger = logging.getLogger(__name__)
class HealthStatus:
"""Health status constants."""
HEALTHY = 'healthy'
DEGRADED = 'degraded'
UNHEALTHY = 'unhealthy'
class HealthCheck:
"""
Health check coordinator for monitoring system health.
"""
def __init__(self):
self._checks: Dict[str, callable] = {}
self._last_check_time: Optional[datetime] = None
self._cached_status: Optional[Dict[str, Any]] = None
self._cache_ttl_seconds: float = 5.0
def register_check(self, name: str, check_fn: callable) -> None:
"""
Register a health check function.
:param name: Check name.
:param check_fn: Function that returns (status, message) tuple.
"""
self._checks[name] = check_fn
logger.debug(f"Registered health check: {name}")
def unregister_check(self, name: str) -> None:
"""Unregister a health check."""
if name in self._checks:
del self._checks[name]
def run_checks(self, use_cache: bool = True) -> Dict[str, Any]:
"""
Run all health checks and return aggregated status.
:param use_cache: Use cached results if available and fresh.
:return: Health status dictionary.
"""
# Check cache
if use_cache and self._cached_status and self._last_check_time:
age = (datetime.utcnow() - self._last_check_time).total_seconds()
if age < self._cache_ttl_seconds:
return self._cached_status
start_time = time.time()
results: Dict[str, Dict[str, Any]] = {}
overall_status = HealthStatus.HEALTHY
for name, check_fn in self._checks.items():
try:
check_start = time.time()
status, message = check_fn()
check_duration = time.time() - check_start
results[name] = {
'status': status,
'message': message,
'duration_ms': round(check_duration * 1000, 2),
}
# Downgrade overall status if needed
if status == HealthStatus.UNHEALTHY:
overall_status = HealthStatus.UNHEALTHY
elif status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
overall_status = HealthStatus.DEGRADED
except Exception as e:
logger.error(f"Health check '{name}' failed with error: {e}")
results[name] = {
'status': HealthStatus.UNHEALTHY,
'message': str(e),
'error': True,
}
overall_status = HealthStatus.UNHEALTHY
total_duration = time.time() - start_time
status = {
'status': overall_status,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'duration_ms': round(total_duration * 1000, 2),
'checks': results,
}
# Cache results
self._cached_status = status
self._last_check_time = datetime.utcnow()
return status
def get_status(self) -> str:
"""Get simple status string."""
result = self.run_checks()
return result['status']
def is_healthy(self) -> bool:
"""Check if system is healthy."""
return self.get_status() == HealthStatus.HEALTHY
# Default health check instance
_health_check = HealthCheck()
def get_health_check() -> HealthCheck:
"""Get the global health check instance."""
return _health_check
def register_default_checks(
data_cache: Any = None,
exchange_interface: Any = None,
) -> None:
"""
Register default health checks.
:param data_cache: DataCache instance.
:param exchange_interface: ExchangeInterface instance.
"""
health = get_health_check()
# Database check
if data_cache:
def check_database():
try:
# Try to query something simple
data_cache.get_rows_from_datacache('strategies', [])
return HealthStatus.HEALTHY, 'Database accessible'
except Exception as e:
return HealthStatus.UNHEALTHY, str(e)
health.register_check('database', check_database)
# Exchange connectivity check
if exchange_interface:
def check_exchange():
try:
# Try to connect to default exchange
if exchange_interface.default_exchange:
return HealthStatus.HEALTHY, 'Exchange connected'
return HealthStatus.DEGRADED, 'No default exchange'
except Exception as e:
return HealthStatus.UNHEALTHY, str(e)
health.register_check('exchange', check_exchange)
# Memory check
def check_memory():
try:
import psutil
memory = psutil.virtual_memory()
if memory.percent > 90:
return HealthStatus.UNHEALTHY, f'Memory usage critical: {memory.percent}%'
elif memory.percent > 75:
return HealthStatus.DEGRADED, f'Memory usage high: {memory.percent}%'
return HealthStatus.HEALTHY, f'Memory usage: {memory.percent}%'
except ImportError:
return HealthStatus.HEALTHY, 'Memory check unavailable (psutil not installed)'
except Exception as e:
return HealthStatus.DEGRADED, str(e)
health.register_check('memory', check_memory)
# Heartbeat check (always healthy if reached)
def check_heartbeat():
return HealthStatus.HEALTHY, 'Service alive'
health.register_check('heartbeat', check_heartbeat)
logger.info("Default health checks registered")
def health_endpoint() -> Dict[str, Any]:
"""
Flask/FastAPI compatible health endpoint function.
Returns health status as JSON-serializable dict.
"""
return get_health_check().run_checks()
def liveness_probe() -> bool:
"""Simple liveness probe for Kubernetes."""
return True
def readiness_probe() -> bool:
"""Readiness probe checking if service is ready to accept traffic."""
return get_health_check().is_healthy()

276
src/logging_config.py Normal file
View File

@ -0,0 +1,276 @@
"""
Logging Configuration for BrighterTrading.
Provides structured logging with consistent formatting across all modules.
"""
import logging
import sys
from typing import Optional, Dict, Any
from datetime import datetime
import json
class StructuredFormatter(logging.Formatter):
"""
Formatter that outputs structured JSON logs for production use.
"""
def format(self, record: logging.LogRecord) -> str:
log_record = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add exception info if present
if record.exc_info:
log_record['exception'] = self.formatException(record.exc_info)
# Add extra fields if present
if hasattr(record, 'extra_data'):
log_record.update(record.extra_data)
return json.dumps(log_record)
class ColoredFormatter(logging.Formatter):
"""
Formatter with colored output for development use.
"""
COLORS = {
'DEBUG': '\033[36m', # Cyan
'INFO': '\033[32m', # Green
'WARNING': '\033[33m', # Yellow
'ERROR': '\033[31m', # Red
'CRITICAL': '\033[35m', # Magenta
}
RESET = '\033[0m'
def format(self, record: logging.LogRecord) -> str:
color = self.COLORS.get(record.levelname, self.RESET)
formatted = super().format(record)
return f"{color}{formatted}{self.RESET}"
def configure_logging(
level: str = 'INFO',
structured: bool = False,
log_file: Optional[str] = None,
) -> None:
"""
Configure logging for the application.
:param level: Log level ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL').
:param structured: Use structured JSON logging (for production).
:param log_file: Optional log file path.
"""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
# Remove existing handlers
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
if structured:
console_handler.setFormatter(StructuredFormatter())
else:
console_handler.setFormatter(ColoredFormatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
root_logger.addHandler(console_handler)
# File handler if specified
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(StructuredFormatter())
root_logger.addHandler(file_handler)
def get_logger(name: str) -> logging.Logger:
"""
Get a logger with the given name.
:param name: Logger name (typically __name__).
:return: Logger instance.
"""
return logging.getLogger(name)
class TradingLogger:
"""
Specialized logger for trading operations with structured fields.
"""
def __init__(self, logger: logging.Logger):
self._logger = logger
def _log_with_context(
self,
level: int,
message: str,
user_id: Optional[int] = None,
strategy_id: Optional[str] = None,
order_id: Optional[str] = None,
symbol: Optional[str] = None,
**kwargs
) -> None:
"""Log message with trading context."""
extra = {
'user_id': user_id,
'strategy_id': strategy_id,
'order_id': order_id,
'symbol': symbol,
}
extra.update(kwargs)
# Filter out None values
extra = {k: v for k, v in extra.items() if v is not None}
record = self._logger.makeRecord(
self._logger.name,
level,
'',
0,
message,
(),
None,
)
record.extra_data = extra
self._logger.handle(record)
def order_placed(
self,
user_id: int,
strategy_id: str,
order_id: str,
symbol: str,
side: str,
size: float,
price: Optional[float] = None,
order_type: str = 'market',
) -> None:
"""Log order placement."""
self._log_with_context(
logging.INFO,
f"Order placed: {side} {size} {symbol}",
user_id=user_id,
strategy_id=strategy_id,
order_id=order_id,
symbol=symbol,
side=side,
size=size,
price=price,
order_type=order_type,
)
def order_filled(
self,
user_id: int,
strategy_id: str,
order_id: str,
symbol: str,
side: str,
size: float,
price: float,
commission: float = 0.0,
) -> None:
"""Log order fill."""
self._log_with_context(
logging.INFO,
f"Order filled: {side} {size} {symbol} @ {price}",
user_id=user_id,
strategy_id=strategy_id,
order_id=order_id,
symbol=symbol,
side=side,
size=size,
fill_price=price,
commission=commission,
)
def order_cancelled(
self,
user_id: int,
strategy_id: str,
order_id: str,
reason: Optional[str] = None,
) -> None:
"""Log order cancellation."""
self._log_with_context(
logging.INFO,
f"Order cancelled: {order_id}",
user_id=user_id,
strategy_id=strategy_id,
order_id=order_id,
cancel_reason=reason,
)
def strategy_started(
self,
user_id: int,
strategy_id: str,
strategy_name: str,
mode: str,
) -> None:
"""Log strategy start."""
self._log_with_context(
logging.INFO,
f"Strategy started: {strategy_name} ({mode} mode)",
user_id=user_id,
strategy_id=strategy_id,
strategy_name=strategy_name,
mode=mode,
)
def strategy_stopped(
self,
user_id: int,
strategy_id: str,
reason: Optional[str] = None,
) -> None:
"""Log strategy stop."""
self._log_with_context(
logging.INFO,
f"Strategy stopped: {strategy_id}",
user_id=user_id,
strategy_id=strategy_id,
stop_reason=reason,
)
def error(
self,
message: str,
user_id: Optional[int] = None,
strategy_id: Optional[str] = None,
error_type: Optional[str] = None,
**kwargs
) -> None:
"""Log error."""
self._log_with_context(
logging.ERROR,
message,
user_id=user_id,
strategy_id=strategy_id,
error_type=error_type,
**kwargs
)
def get_trading_logger(name: str) -> TradingLogger:
"""
Get a trading-specific logger.
:param name: Logger name.
:return: TradingLogger instance.
"""
return TradingLogger(logging.getLogger(name))