brighter-trading/src/exchange_validation.py

315 lines
9.9 KiB
Python

"""
Exchange requirements validation for strategies.
Centralized validator with structured error codes for validating that users
have access to required exchanges before running strategies in different
trading modes (backtest, paper, live).
"""
import logging
from typing import Set, List, Dict, Any
from enum import Enum
logger = logging.getLogger(__name__)
class ValidationErrorCode(Enum):
"""Structured error codes for exchange validation failures."""
MISSING_EDM_DATA = "missing_edm_data"
MISSING_CONFIG = "missing_config"
INVALID_EXCHANGE = "invalid_exchange"
INVALID_KEYS = "invalid_keys"
EDM_UNREACHABLE = "edm_unreachable"
class ExchangeValidationResult:
"""Structured validation result."""
def __init__(
self,
valid: bool,
error_code: ValidationErrorCode = None,
missing_exchanges: Set[str] = None,
message: str = None
):
self.valid = valid
self.error_code = error_code
self.missing_exchanges = missing_exchanges or set()
self.message = message
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary for JSON serialization."""
result = {"valid": self.valid}
if not self.valid:
result["error_code"] = self.error_code.value if self.error_code else None
result["missing_exchanges"] = list(self.missing_exchanges)
result["message"] = self.message
return result
# Exchange name canonicalization map
# Maps variations to canonical names. Names not in map pass through lowercased.
EXCHANGE_ALIASES = {
'binance': 'binance',
'binanceus': 'binanceus',
'binanceusdm': 'binanceusdm',
'binancecoinm': 'binancecoinm',
'kucoin': 'kucoin',
'kucoinfutures': 'kucoinfutures',
'kraken': 'kraken',
'krakenfutures': 'krakenfutures',
'coinbase': 'coinbase',
'coinbasepro': 'coinbasepro',
'bybit': 'bybit',
'okx': 'okx',
'okex': 'okx', # Alias: okex -> okx
'gateio': 'gateio',
'gate': 'gateio', # Alias: gate -> gateio
'htx': 'htx',
'huobi': 'htx', # Alias: huobi -> htx (rebranded)
}
def canonicalize_exchange(name: str) -> str:
"""
Normalize exchange name to canonical form.
Handles case normalization and common aliases.
:param name: Exchange name in any case
:return: Canonical lowercase exchange name
"""
if not name:
return ''
lower = name.lower().strip()
return EXCHANGE_ALIASES.get(lower, lower)
def extract_required_exchanges(strategy: dict) -> Set[str]:
"""
Extract unique exchange names required by a strategy.
Single canonical implementation - use everywhere to avoid drift.
Parses strategy_components.data_sources to identify all exchanges
the strategy needs for execution. Falls back to default_source if
data_sources is empty.
:param strategy: Strategy dictionary from database
:return: Set of canonicalized exchange names
"""
if not strategy:
return set()
components = strategy.get('strategy_components', {})
if isinstance(components, str):
# Handle case where components is JSON string
import json
try:
components = json.loads(components)
except (json.JSONDecodeError, TypeError):
components = {}
data_sources = components.get('data_sources', [])
exchanges = set()
# Extract from data_sources (list of tuples or dicts)
for source in data_sources:
if isinstance(source, (list, tuple)) and len(source) >= 1:
exchange_name = source[0]
if exchange_name:
exchanges.add(canonicalize_exchange(exchange_name))
elif isinstance(source, dict) and source.get('exchange'):
exchanges.add(canonicalize_exchange(source['exchange']))
# Also check default_source as fallback
default_source = strategy.get('default_source', {})
if isinstance(default_source, str):
import json
try:
default_source = json.loads(default_source)
except (json.JSONDecodeError, TypeError):
default_source = {}
if default_source and default_source.get('exchange'):
exchanges.add(canonicalize_exchange(default_source['exchange']))
return exchanges
def get_valid_ccxt_exchanges() -> Set[str]:
"""
Get set of valid ccxt exchange names.
Used to validate that an exchange is supported for paper trading.
:return: Set of lowercase exchange names supported by ccxt
"""
import ccxt
return {ex.lower() for ex in ccxt.exchanges}
def _extract_exchange_name(ex) -> str:
"""
Extract exchange name from EDM response item.
EDM may return either strings or dicts with a 'name' key.
:param ex: Exchange item (str or dict)
:return: Exchange name string
"""
if isinstance(ex, dict):
return ex.get('name', '')
return str(ex) if ex else ''
def validate_for_backtest(
required_exchanges: Set[str],
edm_available_exchanges: List[str]
) -> ExchangeValidationResult:
"""
Validate exchanges for backtest mode.
Backtest requires historical data from EDM. All required exchanges
must be available in EDM's exchange list.
:param required_exchanges: Set of canonicalized exchange names
:param edm_available_exchanges: List of exchanges available in EDM (strings or dicts)
:return: Validation result
"""
if not required_exchanges:
return ExchangeValidationResult(valid=True)
edm_available = {
canonicalize_exchange(_extract_exchange_name(ex))
for ex in (edm_available_exchanges or [])
}
missing = required_exchanges - edm_available
if missing:
return ExchangeValidationResult(
valid=False,
error_code=ValidationErrorCode.MISSING_EDM_DATA,
missing_exchanges=missing,
message=f"Historical data not available for: {', '.join(sorted(missing))}"
)
return ExchangeValidationResult(valid=True)
def validate_for_paper(
required_exchanges: Set[str],
edm_available_exchanges: List[str]
) -> ExchangeValidationResult:
"""
Validate exchanges for paper mode.
Paper mode uses ccxt public endpoints for price fetching, so exchanges
just need to be valid ccxt exchanges. Warns if not in EDM since some
data fetching may fail.
:param required_exchanges: Set of canonicalized exchange names
:param edm_available_exchanges: List of exchanges available in EDM
:return: Validation result
"""
if not required_exchanges:
return ExchangeValidationResult(valid=True)
# Paper mode uses ccxt public endpoints - just need valid ccxt exchange
ccxt_exchanges = get_valid_ccxt_exchanges()
invalid = required_exchanges - ccxt_exchanges
if invalid:
return ExchangeValidationResult(
valid=False,
error_code=ValidationErrorCode.INVALID_EXCHANGE,
missing_exchanges=invalid,
message=f"Unknown exchanges: {', '.join(sorted(invalid))}"
)
# Warn if not in EDM (price fetching may use ccxt fallback)
edm_available = {
canonicalize_exchange(_extract_exchange_name(ex))
for ex in (edm_available_exchanges or [])
}
not_in_edm = required_exchanges - edm_available
if not_in_edm:
logger.warning(
f"Exchanges not in EDM (will use ccxt fallback): {not_in_edm}"
)
return ExchangeValidationResult(valid=True)
def validate_for_live(
required_exchanges: Set[str],
user_configured_exchanges: List[str]
) -> ExchangeValidationResult:
"""
Validate exchanges for live mode.
Live mode requires user to have API keys configured for each exchange.
This is an early check - full validation (including key validity) happens
later in start_strategy when it attempts to connect.
:param required_exchanges: Set of canonicalized exchange names
:param user_configured_exchanges: Exchanges with API keys configured
:return: Validation result
"""
if not required_exchanges:
return ExchangeValidationResult(valid=True)
configured = {
canonicalize_exchange(ex)
for ex in (user_configured_exchanges or [])
}
missing = required_exchanges - configured
if missing:
return ExchangeValidationResult(
valid=False,
error_code=ValidationErrorCode.MISSING_CONFIG,
missing_exchanges=missing,
message=f"API keys required for: {', '.join(sorted(missing))}"
)
return ExchangeValidationResult(valid=True)
def validate_exchange_requirements(
required_exchanges: Set[str],
user_configured_exchanges: List[str],
edm_available_exchanges: List[str],
mode: str
) -> ExchangeValidationResult:
"""
Main validation entrypoint. Routes to mode-specific validator.
Use this from both start_strategy and backtest entry points.
:param required_exchanges: Set of exchange names required by strategy
:param user_configured_exchanges: Exchanges with API keys configured
:param edm_available_exchanges: Exchanges available in EDM
:param mode: Trading mode ('backtest', 'paper', 'live')
:return: Validation result with error details if invalid
"""
# Canonicalize all required exchanges
required = {canonicalize_exchange(ex) for ex in required_exchanges if ex}
if not required:
return ExchangeValidationResult(valid=True)
if mode == 'live':
return validate_for_live(required, user_configured_exchanges)
elif mode == 'paper':
return validate_for_paper(required, edm_available_exchanges)
elif mode == 'backtest':
return validate_for_backtest(required, edm_available_exchanges)
else:
# Unknown mode - reject explicitly
return ExchangeValidationResult(
valid=False,
error_code=ValidationErrorCode.INVALID_EXCHANGE,
missing_exchanges=set(),
message=f"Invalid trading mode: {mode}"
)