brighter-trading/src/brokers/live_broker.py

1279 lines
48 KiB
Python

"""
Live Trading Broker Implementation for BrighterTrading.
Executes real trades via exchange APIs (CCXT).
WARNING: This broker executes real trades with real money when not in testnet mode.
Ensure thorough testing on testnet before production use.
"""
import logging
import time
import json
from functools import wraps
from typing import Any, Dict, List, Optional
from datetime import datetime, timezone, timedelta
import uuid
import hashlib
import ccxt
from .base_broker import (
BaseBroker, OrderResult, OrderSide, OrderType, OrderStatus, Position
)
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Simple rate limiter to prevent API throttling.
Usage:
limiter = RateLimiter(calls_per_second=2.0)
limiter.wait() # Call before each API request
"""
def __init__(self, calls_per_second: float = 2.0):
"""
Initialize the rate limiter.
:param calls_per_second: Maximum API calls per second.
"""
self.min_interval = 1.0 / calls_per_second
self.last_call = 0.0
def wait(self):
"""Wait if necessary to respect rate limits."""
elapsed = time.time() - self.last_call
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call = time.time()
def retry_on_network_error(max_retries: int = 3, delay: float = 1.0):
"""
Decorator to retry a function on network errors.
:param max_retries: Maximum number of retry attempts.
:param delay: Base delay between retries (exponential backoff).
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except ccxt.NetworkError as e:
last_exception = e
if attempt == max_retries - 1:
logger.error(f"Network error after {max_retries} attempts: {e}")
raise
wait_time = delay * (2 ** attempt)
logger.warning(f"Network error, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
class LiveOrder:
"""Represents a live trading order."""
def __init__(
self,
order_id: str,
exchange_order_id: Optional[str],
symbol: str,
side: OrderSide,
order_type: OrderType,
size: float,
price: Optional[float] = None,
stop_loss: Optional[float] = None,
take_profit: Optional[float] = None,
time_in_force: str = 'GTC',
client_order_id: Optional[str] = None
):
self.order_id = order_id
self.exchange_order_id = exchange_order_id
self.client_order_id = client_order_id
self.symbol = symbol
self.side = side
self.order_type = order_type
self.size = size
self.price = price
self.stop_loss = stop_loss
self.take_profit = take_profit
self.time_in_force = time_in_force
self.status = OrderStatus.PENDING
self.filled_qty = 0.0
self.filled_price = 0.0
self.commission = 0.0
self.created_at = datetime.now(timezone.utc)
self.filled_at: Optional[datetime] = None
self.last_update: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for persistence."""
return {
'order_id': self.order_id,
'exchange_order_id': self.exchange_order_id,
'client_order_id': self.client_order_id,
'symbol': self.symbol,
'side': self.side.value,
'order_type': self.order_type.value,
'size': self.size,
'price': self.price,
'stop_loss': self.stop_loss,
'take_profit': self.take_profit,
'time_in_force': self.time_in_force,
'status': self.status.value,
'filled_qty': self.filled_qty,
'filled_price': self.filled_price,
'commission': self.commission,
'created_at': self.created_at.isoformat(),
'filled_at': self.filled_at.isoformat() if self.filled_at else None,
'last_update': self.last_update.isoformat() if self.last_update else None
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'LiveOrder':
"""Create LiveOrder from dictionary."""
order = cls(
order_id=data['order_id'],
exchange_order_id=data.get('exchange_order_id'),
symbol=data['symbol'],
side=OrderSide(data['side']),
order_type=OrderType(data['order_type']),
size=data['size'],
price=data.get('price'),
stop_loss=data.get('stop_loss'),
take_profit=data.get('take_profit'),
time_in_force=data.get('time_in_force', 'GTC'),
client_order_id=data.get('client_order_id')
)
order.status = OrderStatus(data['status'])
order.filled_qty = data.get('filled_qty', 0.0)
order.filled_price = data.get('filled_price', 0.0)
order.commission = data.get('commission', 0.0)
if data.get('created_at'):
order.created_at = datetime.fromisoformat(data['created_at'])
if data.get('filled_at'):
order.filled_at = datetime.fromisoformat(data['filled_at'])
if data.get('last_update'):
order.last_update = datetime.fromisoformat(data['last_update'])
return order
class LiveBroker(BaseBroker):
"""
Live trading broker that executes real trades via CCXT.
WARNING: This broker executes real trades with real money when not in testnet mode.
Ensure thorough testing on testnet before production use.
Features:
- Real order execution via exchange APIs
- Balance and position sync from exchange
- Order lifecycle management (create, cancel, fill detection)
- Restart-safe open order reconciliation
- Rate limiting and retry logic for network errors
"""
def __init__(
self,
exchange: Any = None,
testnet: bool = True,
initial_balance: float = 0.0,
commission: float = 0.001,
slippage: float = 0.0,
data_cache: Any = None,
rate_limit: float = 2.0
):
"""
Initialize the LiveBroker.
:param exchange: Exchange instance (from Exchange.py) for API access.
:param testnet: Use testnet (default True for safety).
:param initial_balance: Starting balance (will be synced from exchange).
:param commission: Commission rate.
:param slippage: Slippage rate.
:param data_cache: DataCache instance for state persistence.
:param rate_limit: API calls per second limit.
"""
super().__init__(initial_balance, commission, slippage)
self._exchange = exchange
self._testnet = testnet
self._data_cache = data_cache
self._rate_limiter = RateLimiter(calls_per_second=rate_limit)
# Warn if not using testnet
if not testnet:
logger.warning(
"LiveBroker initialized for PRODUCTION trading. "
"Real money will be used!"
)
else:
logger.info("LiveBroker initialized in TESTNET mode.")
# Connection state
self._connected = False
# Balance tracking - keyed by asset symbol
self._balances: Dict[str, float] = {}
self._locked_balances: Dict[str, float] = {}
# Orders and positions
self._orders: Dict[str, LiveOrder] = {}
self._positions: Dict[str, Position] = {}
# Price cache with expiration
self._current_prices: Dict[str, float] = {}
self._price_timestamps: Dict[str, datetime] = {}
self._price_cache_ttl_seconds: float = 5.0 # Cache expires after 5 seconds
# Auto-generated client IDs are reused briefly to make retries idempotent.
self._auto_client_id_window_seconds: float = 5.0
self._auto_client_ids: Dict[str, tuple[str, datetime]] = {}
# Last sync timestamps
self._last_balance_sync: Optional[datetime] = None
self._last_position_sync: Optional[datetime] = None
self._last_order_sync: Optional[datetime] = None
def _ensure_connected(self):
"""Ensure exchange connection is established."""
if not self._connected:
raise RuntimeError(
"LiveBroker not connected to exchange. "
"Call connect() first."
)
if not self._exchange:
raise RuntimeError(
"LiveBroker has no exchange configured."
)
@retry_on_network_error()
def connect(self) -> bool:
"""
Connect to the exchange and sync initial state.
:return: True if connection successful.
"""
if not self._exchange:
logger.error("Cannot connect: no exchange configured")
return False
try:
logger.info("LiveBroker connecting to exchange...")
# Verify exchange is configured with API keys
if not self._exchange.configured:
logger.error("Exchange is not configured with valid API keys")
return False
# Sync balance from exchange
self.sync_balance()
# Sync open orders
self.sync_open_orders()
# Sync positions
self.sync_positions()
self._connected = True
logger.info("LiveBroker connected successfully")
return True
except ccxt.AuthenticationError as e:
logger.error(f"Authentication failed: {e}")
return False
except ccxt.BaseError as e:
logger.error(f"Exchange error during connect: {e}")
return False
def disconnect(self):
"""Disconnect from the exchange."""
self._connected = False
logger.info("LiveBroker disconnected")
@retry_on_network_error()
def sync_balance(self) -> Dict[str, float]:
"""
Sync balance from exchange.
:return: Dict of asset balances.
"""
if not self._exchange:
return {}
self._rate_limiter.wait()
try:
balance_data = self._exchange.client.fetch_balance()
# Update total balances
self._balances.clear()
self._locked_balances.clear()
for asset, data in balance_data.items():
if isinstance(data, dict):
total = float(data.get('total', 0) or 0)
free = float(data.get('free', 0) or 0)
used = float(data.get('used', 0) or 0)
if total > 0:
self._balances[asset] = total
self._locked_balances[asset] = used
self._last_balance_sync = datetime.now(timezone.utc)
logger.debug(f"Balance synced: {len(self._balances)} assets")
return self._balances.copy()
except ccxt.BaseError as e:
logger.error(f"Error syncing balance: {e}")
return {}
@retry_on_network_error()
def sync_positions(self) -> List[Position]:
"""
Sync positions from exchange.
:return: List of current positions.
"""
if not self._exchange:
return []
self._rate_limiter.wait()
try:
# Get active trades/positions from exchange
trades = self._exchange.get_active_trades()
self._positions.clear()
for trade in trades:
symbol = trade['symbol']
size = float(trade['quantity'])
entry_price = float(trade['price'])
# Get current price for P&L calculation
current_price = self.get_current_price(symbol)
if current_price <= 0:
current_price = entry_price
side = trade.get('side', 'buy')
if side == 'sell':
size = -size
unrealized_pnl = (current_price - entry_price) * size
self._positions[symbol] = Position(
symbol=symbol,
size=size,
entry_price=entry_price,
current_price=current_price,
unrealized_pnl=unrealized_pnl
)
self._last_position_sync = datetime.now(timezone.utc)
logger.debug(f"Positions synced: {len(self._positions)} positions")
return list(self._positions.values())
except ccxt.BaseError as e:
logger.error(f"Error syncing positions: {e}")
return []
@retry_on_network_error()
def sync_open_orders(self) -> List[Dict[str, Any]]:
"""
Sync open orders from exchange.
Used for restart-safe order reconciliation.
:return: List of open orders from exchange.
"""
if not self._exchange:
return []
self._rate_limiter.wait()
try:
exchange_orders = self._exchange.get_open_orders()
# Build set of existing exchange order IDs to avoid duplicates
existing_exchange_ids = {
order.exchange_order_id
for order in self._orders.values()
if order.exchange_order_id
}
synced_orders = []
for ex_order in exchange_orders:
exchange_order_id = ex_order.get('id')
# Skip orders without ID (shouldn't happen with fixed Exchange.get_open_orders)
if not exchange_order_id:
logger.warning(f"Skipping order without ID: {ex_order}")
continue
exchange_order_id = str(exchange_order_id)
# Skip if we already track this exchange order
if exchange_order_id in existing_exchange_ids:
synced_orders.append(ex_order)
continue
# Create local order tracking
symbol = ex_order['symbol']
side = OrderSide.BUY if ex_order['side'].lower() == 'buy' else OrderSide.SELL
order_type = OrderType.LIMIT if ex_order.get('type', 'limit').lower() == 'limit' else OrderType.MARKET
size = float(ex_order['quantity'])
price = float(ex_order.get('price', 0) or 0)
# Use local UUID for internal tracking, but key by exchange ID
local_order_id = str(uuid.uuid4())[:8]
order = LiveOrder(
order_id=local_order_id,
exchange_order_id=exchange_order_id,
symbol=symbol,
side=side,
order_type=order_type,
size=size,
price=price if price > 0 else None,
client_order_id=ex_order.get('clientOrderId')
)
order.status = OrderStatus.OPEN
order.filled_qty = float(ex_order.get('filled', 0))
self._orders[local_order_id] = order
existing_exchange_ids.add(exchange_order_id)
logger.info(f"Synced open order from exchange: {exchange_order_id} -> {local_order_id}")
synced_orders.append(ex_order)
self._last_order_sync = datetime.now(timezone.utc)
logger.debug(f"Orders synced: {len(synced_orders)} open orders")
return synced_orders
except ccxt.BaseError as e:
logger.error(f"Error syncing open orders: {e}")
return []
def _generate_client_order_id(
self,
symbol: str,
side: OrderSide,
order_type: OrderType,
size: float,
price: Optional[float],
nonce: str = None
) -> str:
"""
Generate a deterministic client order ID for idempotency.
This ensures that retried orders can be deduplicated by the exchange
if they support clientOrderId.
IMPORTANT: The nonce MUST be provided by the caller and remain constant
across retries for true idempotency. If not provided, a warning is logged
and a timestamp-based fallback is used (which defeats idempotency).
:param nonce: Unique nonce for this order placement attempt. MUST be constant across retries.
:return: Client order ID string.
"""
if nonce is None:
# WARNING: Without a stable nonce, retries will generate different IDs!
logger.warning("No nonce provided for client order ID - idempotency not guaranteed")
nonce = str(int(time.time() * 1000000))
# Create a hash of order parameters for idempotency
# The hash ensures same params + same nonce = same clientOrderId
order_data = f"{symbol}:{side.value}:{order_type.value}:{size}:{price}:{nonce}"
order_hash = hashlib.sha256(order_data.encode()).hexdigest()[:16]
return f"BT{order_hash}"
def _get_or_create_auto_client_order_id(
self,
symbol: str,
side: OrderSide,
order_type: OrderType,
size: float,
price: Optional[float]
) -> str:
"""
Create/reuse an auto client order ID for a short retry window.
This keeps retries idempotent when callers do not provide a client_order_id.
"""
now = datetime.now(timezone.utc)
size_token = f"{float(size):.12g}"
price_token = "market" if price is None else f"{float(price):.12g}"
intent_key = f"{symbol}:{side.value}:{order_type.value}:{size_token}:{price_token}"
# Drop stale intent keys.
stale_keys = [
key for key, (_, ts) in self._auto_client_ids.items()
if (now - ts).total_seconds() > self._auto_client_id_window_seconds
]
for key in stale_keys:
self._auto_client_ids.pop(key, None)
cached = self._auto_client_ids.get(intent_key)
if cached is not None:
cached_id, cached_ts = cached
if (now - cached_ts).total_seconds() <= self._auto_client_id_window_seconds:
return cached_id
# Fresh ID for this intent. A UUID salt prevents long-lived collisions.
nonce = f"{intent_key}:{uuid.uuid4().hex[:12]}"
client_id = self._generate_client_order_id(
symbol=symbol,
side=side,
order_type=order_type,
size=size,
price=price,
nonce=nonce
)
self._auto_client_ids[intent_key] = (client_id, now)
return client_id
def place_order(
self,
symbol: str,
side: OrderSide,
order_type: OrderType,
size: float,
price: Optional[float] = None,
stop_loss: Optional[float] = None,
take_profit: Optional[float] = None,
time_in_force: str = 'GTC',
client_order_id: Optional[str] = None
) -> OrderResult:
"""
Place an order on the exchange.
NOTE: This method is NOT wrapped with @retry_on_network_error because
retrying order placement can cause duplicate orders. Instead, we use
clientOrderId for idempotency when supported by the exchange.
"""
self._ensure_connected()
# Generate local order ID
order_id = str(uuid.uuid4())[:8]
# Generate/reuse client order ID for idempotency.
if client_order_id is None:
client_order_id = self._get_or_create_auto_client_order_id(
symbol, side, order_type, size, price
)
# Check if we already have an order with this client ID (duplicate detection)
for existing_order in self._orders.values():
if hasattr(existing_order, 'client_order_id') and existing_order.client_order_id == client_order_id:
logger.warning(f"Duplicate order detected: {client_order_id}")
return OrderResult(
success=True,
order_id=existing_order.order_id,
exchange_order_id=existing_order.exchange_order_id,
status=existing_order.status,
filled_qty=existing_order.filled_qty,
filled_price=existing_order.filled_price,
commission=existing_order.commission,
message="Order already exists (duplicate detection)"
)
# Validate order
if size <= 0:
return OrderResult(
success=False,
message="Order size must be positive"
)
if order_type == OrderType.LIMIT and (price is None or price <= 0):
return OrderResult(
success=False,
message="Limit orders require a valid price"
)
self._rate_limiter.wait()
try:
# Map order type to exchange format
ex_order_type = 'market' if order_type == OrderType.MARKET else 'limit'
ex_side = side.value # 'buy' or 'sell'
logger.info(f"Placing {ex_order_type} order: {ex_side} {size} {symbol} @ {price or 'market'} (clientId: {client_order_id})")
# Place order via exchange with client order ID for idempotency
# Note: Not all exchanges support clientOrderId, so we pass it as optional param
result, exchange_order = self._exchange.place_order(
symbol=symbol,
side=ex_side,
type=ex_order_type,
timeInForce=time_in_force,
quantity=size,
price=price,
client_order_id=client_order_id
)
if result != 'Success' or exchange_order is None:
logger.error(f"Order placement failed: {result}")
return OrderResult(
success=False,
message=f"Order placement failed: {result}"
)
# Create local order record
order = LiveOrder(
order_id=order_id,
exchange_order_id=str(exchange_order.get('id', '')),
symbol=symbol,
side=side,
order_type=order_type,
size=size,
price=price,
stop_loss=stop_loss,
take_profit=take_profit,
time_in_force=time_in_force,
client_order_id=client_order_id
)
# Parse order status from exchange response
ex_status = exchange_order.get('status', 'open').lower()
if ex_status == 'closed' or ex_status == 'filled':
order.status = OrderStatus.FILLED
order.filled_qty = float(exchange_order.get('filled', size))
order.filled_price = float(exchange_order.get('average', price or 0))
order.filled_at = datetime.now(timezone.utc)
# Calculate commission
fee = exchange_order.get('fee', {})
if fee:
order.commission = float(fee.get('cost', 0) or 0)
else:
order.commission = order.filled_qty * order.filled_price * self.commission
else:
order.status = OrderStatus.OPEN
self._orders[order_id] = order
logger.info(f"Order placed: {order_id} (exchange: {order.exchange_order_id}) - {order.status.value}")
return OrderResult(
success=True,
order_id=order_id,
exchange_order_id=order.exchange_order_id,
status=order.status,
filled_qty=order.filled_qty,
filled_price=order.filled_price,
commission=order.commission,
message=f"Order {order_id} placed successfully"
)
except ccxt.InsufficientFunds as e:
logger.error(f"Insufficient funds: {e}")
return OrderResult(
success=False,
message=f"Insufficient funds: {e}"
)
except ccxt.InvalidOrder as e:
logger.error(f"Invalid order: {e}")
return OrderResult(
success=False,
message=f"Invalid order: {e}"
)
except ccxt.BaseError as e:
logger.error(f"Exchange error placing order: {e}")
return OrderResult(
success=False,
message=f"Exchange error: {e}"
)
@retry_on_network_error()
def cancel_order(self, order_id: str) -> bool:
"""Cancel an order on the exchange."""
self._ensure_connected()
if order_id not in self._orders:
logger.warning(f"Order {order_id} not found")
return False
order = self._orders[order_id]
if order.status not in [OrderStatus.OPEN, OrderStatus.PENDING]:
logger.warning(f"Cannot cancel order {order_id}: status is {order.status.value}")
return False
self._rate_limiter.wait()
try:
# Cancel on exchange
self._exchange.client.cancel_order(
order.exchange_order_id,
order.symbol
)
order.status = OrderStatus.CANCELLED
order.last_update = datetime.now(timezone.utc)
logger.info(f"Order {order_id} cancelled")
return True
except ccxt.OrderNotFound:
logger.warning(f"Order {order_id} not found on exchange (may already be filled/cancelled)")
order.status = OrderStatus.CANCELLED
return True
except ccxt.BaseError as e:
logger.error(f"Error cancelling order {order_id}: {e}")
return False
@retry_on_network_error()
def get_order(self, order_id: str) -> Optional[Dict[str, Any]]:
"""Get order details from exchange."""
self._ensure_connected()
if order_id in self._orders:
order = self._orders[order_id]
# If order is still open, refresh from exchange
if order.status == OrderStatus.OPEN and order.exchange_order_id:
self._rate_limiter.wait()
try:
ex_order = self._exchange.get_order(order.symbol, order.exchange_order_id)
if ex_order:
self._update_order_from_exchange(order, ex_order)
except ccxt.BaseError as e:
logger.warning(f"Could not refresh order {order_id}: {e}")
return order.to_dict()
return None
def _update_order_from_exchange(self, order: LiveOrder, ex_order: Dict[str, Any]):
"""Update local order with exchange data."""
ex_status = ex_order.get('status', 'open').lower()
if ex_status == 'closed' or ex_status == 'filled':
order.status = OrderStatus.FILLED
order.filled_qty = float(ex_order.get('filled', order.size))
order.filled_price = float(ex_order.get('average', order.price or 0))
if not order.filled_at:
order.filled_at = datetime.now(timezone.utc)
# Update commission
fee = ex_order.get('fee', {})
if fee:
order.commission = float(fee.get('cost', 0) or 0)
elif ex_status == 'canceled' or ex_status == 'cancelled':
order.status = OrderStatus.CANCELLED
elif ex_status == 'partially_filled':
order.status = OrderStatus.PARTIALLY_FILLED
order.filled_qty = float(ex_order.get('filled', 0))
order.filled_price = float(ex_order.get('average', 0))
order.last_update = datetime.now(timezone.utc)
def get_open_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all open orders from local cache."""
self._ensure_connected()
open_orders = []
for order in self._orders.values():
if order.status in [OrderStatus.OPEN, OrderStatus.PENDING]:
if symbol is None or order.symbol == symbol:
open_orders.append(order.to_dict())
return open_orders
def get_balance(self, asset: Optional[str] = None) -> float:
"""Get balance from cached balances."""
self._ensure_connected()
if asset:
return self._balances.get(asset, 0.0)
# Return total balance (sum of all assets in quote currency)
# For simplicity, return USDT balance if available
for quote in ['USDT', 'USD', 'BUSD', 'USDC']:
if quote in self._balances:
return self._balances[quote]
# Return first balance if no quote currency found
if self._balances:
return list(self._balances.values())[0]
return 0.0
def get_available_balance(self, asset: Optional[str] = None) -> float:
"""Get available balance (total minus locked in orders)."""
self._ensure_connected()
if asset:
total = self._balances.get(asset, 0.0)
locked = self._locked_balances.get(asset, 0.0)
return total - locked
# Return available USDT balance if available
for quote in ['USDT', 'USD', 'BUSD', 'USDC']:
if quote in self._balances:
total = self._balances[quote]
locked = self._locked_balances.get(quote, 0.0)
return total - locked
return 0.0
def get_total_equity(self, quote_asset: str = 'USDT', max_assets: int = 10) -> float:
"""
Get total equity by converting holdings to quote currency value.
For performance, only fetches prices for the top N assets by balance.
Stablecoins are counted 1:1, fiat currencies are skipped.
:param quote_asset: Quote currency to use for valuation (default USDT).
:param max_assets: Maximum number of non-stablecoin assets to price (default 10).
:return: Total equity in quote currency.
"""
self._ensure_connected()
if not self._balances:
return 0.0
total_equity = 0.0
# Quote currencies that don't need conversion (stablecoins)
stablecoins = {'USDT', 'USD', 'BUSD', 'USDC', 'DAI', 'TUSD'}
# Fiat currencies and other assets that don't trade on crypto exchanges
skip_assets = {
'TRY', 'ZAR', 'UAH', 'BRL', 'PLN', 'ARS', 'JPY', 'MXN', 'COP', 'IDR',
'CZK', 'EUR', 'GBP', 'AUD', 'CAD', 'CHF', 'CNY', 'HKD', 'INR', 'KRW',
'NGN', 'PHP', 'RUB', 'SGD', 'THB', 'TWD', 'VND', 'NZD', 'SEK', 'NOK',
'DKK', 'ILS', 'MYR', 'PKR', 'KES', 'EGP', 'CLP', 'PEN', 'AED', 'SAR',
'456', '这是测试币',
}
# Separate stablecoins from assets needing price lookup
assets_to_price = []
for asset, balance in self._balances.items():
if balance <= 0:
continue
if asset in stablecoins:
total_equity += balance
elif asset not in skip_assets:
assets_to_price.append((asset, balance))
# Sort by balance descending and take top N
assets_to_price.sort(key=lambda x: x[1], reverse=True)
assets_to_price = assets_to_price[:max_assets]
logger.info(f"get_total_equity: pricing top {len(assets_to_price)} of {len(self._balances)} assets")
for asset, balance in assets_to_price:
try:
symbol = f"{asset}/{quote_asset}"
price = self.get_current_price(symbol)
if price > 0:
total_equity += balance * price
logger.debug(f" {asset}: {balance} * {price} = {balance * price}")
except Exception as e:
logger.warning(f"Error getting price for {asset}: {e}")
logger.info(f"get_total_equity: total = {total_equity}")
return total_equity
def get_position(self, symbol: str) -> Optional[Position]:
"""Get position from cache."""
self._ensure_connected()
return self._positions.get(symbol)
def get_all_positions(self) -> List[Position]:
"""Get all positions from cache."""
self._ensure_connected()
return list(self._positions.values())
@retry_on_network_error()
def get_current_price(self, symbol: str) -> float:
"""Get current price from exchange with cache expiration."""
if not self._exchange:
return self._current_prices.get(symbol, 0.0)
# Check cache first - return cached price only if not expired
if symbol in self._current_prices and symbol in self._price_timestamps:
cache_age = (datetime.now(timezone.utc) - self._price_timestamps[symbol]).total_seconds()
if cache_age < self._price_cache_ttl_seconds:
return self._current_prices[symbol]
# Cache expired, will fetch fresh price below
self._rate_limiter.wait()
try:
price = self._exchange.get_price(symbol)
if price > 0:
self._current_prices[symbol] = price
self._price_timestamps[symbol] = datetime.now(timezone.utc)
return price
except ccxt.BaseError as e:
logger.warning(f"Error getting price for {symbol}: {e}")
# Return stale price as fallback if fetch fails
return self._current_prices.get(symbol, 0.0)
def update(self) -> List[Dict[str, Any]]:
"""
Process pending orders and sync with exchange.
Checks for order fills, updates positions, and emits events.
:return: List of events (fills, updates, etc.).
"""
self._ensure_connected()
events = []
# Update prices for tracked symbols
for symbol in list(self._positions.keys()) + [o.symbol for o in self._orders.values() if o.status == OrderStatus.OPEN]:
try:
price = self.get_current_price(symbol)
if price > 0:
self._current_prices[symbol] = price
except Exception:
pass
# Check for fills on open orders
for order_id, order in list(self._orders.items()):
if order.status not in [OrderStatus.OPEN, OrderStatus.PENDING]:
continue
if not order.exchange_order_id:
continue
try:
self._rate_limiter.wait()
ex_order = self._exchange.get_order(order.symbol, order.exchange_order_id)
if ex_order:
old_status = order.status
self._update_order_from_exchange(order, ex_order)
# Emit fill event if order was filled
if order.status == OrderStatus.FILLED and old_status != OrderStatus.FILLED:
# Calculate profitability for sell orders (before position update)
is_profitable = False
realized_pnl = 0.0
entry_price = 0.0
if order.side == OrderSide.SELL and order.symbol in self._positions:
pos = self._positions[order.symbol]
entry_price = pos.entry_price
realized_pnl = (order.filled_price - pos.entry_price) * order.filled_qty - order.commission
is_profitable = realized_pnl > 0
events.append({
'type': 'fill',
'order_id': order_id,
'exchange_order_id': order.exchange_order_id,
'symbol': order.symbol,
'side': order.side.value,
'size': order.filled_qty,
'filled_qty': order.filled_qty,
'price': order.filled_price,
'filled_price': order.filled_price,
'commission': order.commission,
'is_profitable': is_profitable,
'realized_pnl': realized_pnl,
'entry_price': entry_price
})
logger.info(f"Order filled: {order_id} - {order.side.value} {order.filled_qty} {order.symbol} @ {order.filled_price}")
# Update position after fill
self._update_position_from_fill(order)
except ccxt.BaseError as e:
logger.warning(f"Error checking order {order_id}: {e}")
# Update position P&L
for symbol, position in self._positions.items():
current_price = self._current_prices.get(symbol, position.current_price)
if current_price > 0:
position.current_price = current_price
position.unrealized_pnl = (current_price - position.entry_price) * position.size
return events
def _update_position_from_fill(self, order: LiveOrder):
"""Update position based on a filled order."""
symbol = order.symbol
filled_size = order.filled_qty
fill_price = order.filled_price
if order.side == OrderSide.BUY:
if symbol in self._positions:
# Average in to existing position
pos = self._positions[symbol]
new_size = pos.size + filled_size
new_entry = (pos.entry_price * pos.size + fill_price * filled_size) / new_size
pos.size = new_size
pos.entry_price = new_entry
else:
# New position
self._positions[symbol] = Position(
symbol=symbol,
size=filled_size,
entry_price=fill_price,
current_price=fill_price,
unrealized_pnl=0.0
)
else:
# Sell order - reduce or close position
if symbol in self._positions:
pos = self._positions[symbol]
realized_pnl = (fill_price - pos.entry_price) * filled_size - order.commission
pos.realized_pnl += realized_pnl
pos.size -= filled_size
if pos.size <= 0:
del self._positions[symbol]
# ==================== State Persistence Methods ====================
def _ensure_persistence_cache(self) -> bool:
"""Ensure the persistence table/cache exists."""
if not self._data_cache:
return False
try:
# Create backing DB table
if hasattr(self._data_cache, 'db') and hasattr(self._data_cache.db, 'execute_sql'):
self._data_cache.db.execute_sql(
'CREATE TABLE IF NOT EXISTS "live_broker_states" ('
'id INTEGER PRIMARY KEY AUTOINCREMENT, '
'tbl_key TEXT UNIQUE, '
'strategy_instance_id TEXT UNIQUE, '
'broker_state TEXT, '
'updated_at TEXT)',
[]
)
self._data_cache.create_cache(
name='live_broker_states',
cache_type='table',
size_limit=5000,
eviction_policy='deny',
default_expiration=timedelta(days=7),
columns=['id', 'tbl_key', 'strategy_instance_id', 'broker_state', 'updated_at']
)
return True
except Exception as e:
logger.error(f"LiveBroker: Error ensuring persistence cache: {e}", exc_info=True)
return False
def to_state_dict(self) -> Dict[str, Any]:
"""Serialize broker state to dictionary."""
orders_data = {oid: o.to_dict() for oid, o in self._orders.items()}
positions_data = {sym: p.to_dict() for sym, p in self._positions.items()}
return {
'testnet': self._testnet,
'balances': self._balances.copy(),
'locked_balances': self._locked_balances.copy(),
'orders': orders_data,
'positions': positions_data,
'current_prices': self._current_prices.copy(),
'last_balance_sync': self._last_balance_sync.isoformat() if self._last_balance_sync else None,
'last_position_sync': self._last_position_sync.isoformat() if self._last_position_sync else None,
'last_order_sync': self._last_order_sync.isoformat() if self._last_order_sync else None,
}
def from_state_dict(self, state: Dict[str, Any]):
"""Restore broker state from dictionary."""
if not state:
return
# Restore balances
self._balances = state.get('balances', {})
self._locked_balances = state.get('locked_balances', {})
# Restore orders
self._orders.clear()
for order_id, order_data in state.get('orders', {}).items():
self._orders[order_id] = LiveOrder.from_dict(order_data)
# Restore positions
self._positions.clear()
for symbol, pos_data in state.get('positions', {}).items():
self._positions[symbol] = Position.from_dict(pos_data)
# Restore price cache
self._current_prices = state.get('current_prices', {})
# Restore timestamps
if state.get('last_balance_sync'):
self._last_balance_sync = datetime.fromisoformat(state['last_balance_sync'])
if state.get('last_position_sync'):
self._last_position_sync = datetime.fromisoformat(state['last_position_sync'])
if state.get('last_order_sync'):
self._last_order_sync = datetime.fromisoformat(state['last_order_sync'])
logger.info(f"LiveBroker: State restored - {len(self._orders)} orders, {len(self._positions)} positions")
def save_state(self, strategy_instance_id: str) -> bool:
"""Save broker state to data cache."""
if not self._data_cache:
logger.warning("LiveBroker: No data cache available for persistence")
return False
try:
if not self._ensure_persistence_cache():
return False
state_dict = self.to_state_dict()
state_json = json.dumps(state_dict)
existing = self._data_cache.get_rows_from_datacache(
cache_name='live_broker_states',
filter_vals=[('strategy_instance_id', strategy_instance_id)]
)
columns = ('tbl_key', 'strategy_instance_id', 'broker_state', 'updated_at')
values = (strategy_instance_id, strategy_instance_id, state_json, datetime.now(timezone.utc).isoformat())
if existing.empty:
self._data_cache.insert_row_into_datacache(
cache_name='live_broker_states',
columns=columns,
values=values
)
else:
self._data_cache.modify_datacache_item(
cache_name='live_broker_states',
filter_vals=[('strategy_instance_id', strategy_instance_id)],
field_names=columns,
new_values=values,
overwrite='strategy_instance_id'
)
logger.debug(f"LiveBroker: State saved for {strategy_instance_id}")
return True
except Exception as e:
logger.error(f"LiveBroker: Error saving state: {e}", exc_info=True)
return False
def load_state(self, strategy_instance_id: str) -> bool:
"""Load broker state from data cache."""
if not self._data_cache:
logger.warning("LiveBroker: No data cache available for persistence")
return False
try:
if not self._ensure_persistence_cache():
return False
existing = self._data_cache.get_rows_from_datacache(
cache_name='live_broker_states',
filter_vals=[('strategy_instance_id', strategy_instance_id)]
)
if existing.empty:
logger.debug(f"LiveBroker: No saved state for {strategy_instance_id}")
return False
state_json = existing.iloc[0].get('broker_state', '{}')
state_dict = json.loads(state_json)
self.from_state_dict(state_dict)
logger.info(f"LiveBroker: State loaded for {strategy_instance_id}")
return True
except Exception as e:
logger.error(f"LiveBroker: Error loading state: {e}", exc_info=True)
return False
def reconcile_with_exchange(self) -> Dict[str, Any]:
"""
Compare persisted state with exchange reality and log discrepancies.
Call this after load_state to ensure consistency.
:return: Dict with reconciliation results.
"""
if not self._connected:
logger.warning("Cannot reconcile: not connected")
return {'success': False, 'error': 'Not connected'}
results = {
'success': True,
'balance_changes': [],
'order_changes': [],
'position_changes': []
}
try:
# Reconcile balances
old_balances = self._balances.copy()
self.sync_balance()
for asset, new_bal in self._balances.items():
old_bal = old_balances.get(asset, 0)
if abs(new_bal - old_bal) > 0.00001:
results['balance_changes'].append({
'asset': asset,
'old': old_bal,
'new': new_bal,
'diff': new_bal - old_bal
})
logger.info(f"Balance reconciled: {asset} {old_bal} -> {new_bal}")
# Reconcile open orders
old_open_orders = {oid: o for oid, o in self._orders.items()
if o.status in [OrderStatus.OPEN, OrderStatus.PENDING]}
exchange_orders = self.sync_open_orders()
exchange_order_ids = {str(o.get('id')) for o in exchange_orders}
for order_id, order in old_open_orders.items():
if order.exchange_order_id not in exchange_order_ids:
# Order no longer on exchange - might be filled or cancelled
try:
ex_order = self._exchange.get_order(order.symbol, order.exchange_order_id)
if ex_order:
self._update_order_from_exchange(order, ex_order)
results['order_changes'].append({
'order_id': order_id,
'old_status': 'open',
'new_status': order.status.value
})
except Exception:
order.status = OrderStatus.CANCELLED
results['order_changes'].append({
'order_id': order_id,
'old_status': 'open',
'new_status': 'cancelled (assumed)'
})
# Reconcile positions
old_positions = {sym: p.size for sym, p in self._positions.items()}
self.sync_positions()
for symbol, pos in self._positions.items():
old_size = old_positions.get(symbol, 0)
if abs(pos.size - old_size) > 0.00001:
results['position_changes'].append({
'symbol': symbol,
'old_size': old_size,
'new_size': pos.size
})
logger.info(f"Position reconciled: {symbol} {old_size} -> {pos.size}")
if results['balance_changes'] or results['order_changes'] or results['position_changes']:
logger.info(f"Reconciliation complete: {len(results['balance_changes'])} balance changes, "
f"{len(results['order_changes'])} order changes, "
f"{len(results['position_changes'])} position changes")
else:
logger.info("Reconciliation complete: no discrepancies found")
return results
except Exception as e:
logger.error(f"Error during reconciliation: {e}", exc_info=True)
return {'success': False, 'error': str(e)}