diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 0000000..d348520
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,74 @@
+name: Tests
+
+on:
+ push:
+ branches: [main, master, recovery/integration-lab]
+ pull_request:
+ branches: [main, master]
+
+jobs:
+ test:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.12'
+ cache: 'pip'
+
+ - name: Install system dependencies
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y libta-lib0-dev
+
+ - name: Install Python dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -r requirements.txt
+ pip install pytest pytest-cov
+
+ - name: Syntax check
+ run: |
+ python -m py_compile src/*.py
+ python -m py_compile src/brokers/*.py
+
+ - name: Run critical test suites
+ working-directory: .
+ run: |
+ pytest tests/test_strategy_execution.py \
+ tests/test_execution_loop.py \
+ tests/test_paper_persistence.py \
+ tests/test_backtest_determinism.py \
+ tests/test_brokers.py \
+ -v --tb=short
+
+ - name: Run broker abstraction tests
+ working-directory: .
+ run: |
+ pytest tests/test_brokers.py -v --tb=short
+
+ lint:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.12'
+
+ - name: Install linting tools
+ run: |
+ python -m pip install --upgrade pip
+ pip install ruff
+
+ - name: Check code style
+ run: |
+ ruff check src/ --select=E,F --ignore=E501,E402,F401 || true
+ continue-on-error: true
diff --git a/src/BrighterTrades.py b/src/BrighterTrades.py
index 8edd42a..0a5bdfe 100644
--- a/src/BrighterTrades.py
+++ b/src/BrighterTrades.py
@@ -349,7 +349,8 @@ class BrighterTrades:
price_updates = {symbol: float(cdata['close'])}
trade_updates = self.trades.update(price_updates)
- # stg_updates = self.strategies.update()
+ # Update all active strategy instances with new candle data
+ stg_updates = self.strategies.update(candle_data=cdata)
updates = {}
# if i_updates:
@@ -358,8 +359,15 @@ class BrighterTrades:
updates['s_updates'] = state_changes
if trade_updates:
updates['trade_updts'] = trade_updates
- # if stg_updates:
- # updates['stg_updts'] = stg_updates
+ if stg_updates:
+ updates['stg_updts'] = stg_updates
+
+ # Log any errors from strategy execution
+ for event in stg_updates:
+ if event.get('type') == 'error':
+ logger.warning(f"Strategy error: {event.get('message')} "
+ f"(user={event.get('user_id')}, strategy={event.get('strategy_id')})")
+
return updates
def received_new_signal(self, data: dict) -> str | dict:
@@ -549,6 +557,256 @@ class BrighterTrades:
"tbl_key": tbl_key # Include tbl_key even on failure for debugging
}
+ def start_strategy(
+ self,
+ user_id: int,
+ strategy_id: str,
+ mode: str,
+ initial_balance: float = 10000.0,
+ commission: float = 0.001,
+ ) -> dict:
+ """
+ Start a strategy in the specified mode (paper or live).
+
+ :param user_id: User identifier.
+ :param strategy_id: Strategy tbl_key.
+ :param mode: Trading mode ('paper' or 'live').
+ :param initial_balance: Starting balance for paper trading.
+ :param commission: Commission rate.
+ :return: Dictionary with success status and details.
+ """
+ from brokers import TradingMode
+ import uuid
+
+ # Validate mode
+ if mode not in [TradingMode.PAPER, TradingMode.LIVE]:
+ return {"success": False, "message": f"Invalid mode '{mode}'. Use 'paper' or 'live'."}
+
+ # Live mode currently falls back to paper for execution.
+ effective_mode = TradingMode.PAPER if mode == TradingMode.LIVE else mode
+
+ # Get the strategy data
+ strategy_data = self.strategies.data_cache.get_rows_from_datacache(
+ cache_name='strategies',
+ filter_vals=[('tbl_key', strategy_id)],
+ include_tbl_key=True
+ )
+
+ if strategy_data.empty:
+ return {"success": False, "message": "Strategy not found."}
+
+ strategy_row = strategy_data.iloc[0]
+ strategy_name = strategy_row.get('name', 'Unknown')
+
+ # Authorization check: user must own the strategy or strategy must be public
+ strategy_creator = strategy_row.get('creator')
+ is_public = bool(strategy_row.get('public', False))
+
+ if not is_public:
+ requester_name = None
+ try:
+ requester_name = self.users.get_username(user_id=user_id)
+ except Exception:
+ logger.warning(f"Unable to resolve username for user id '{user_id}'.")
+
+ creator_str = str(strategy_creator) if strategy_creator is not None else ''
+ requester_id_str = str(user_id)
+
+ creator_matches_user = False
+ if creator_str:
+ # Support creator being stored as user_name or user_id.
+ creator_matches_user = (
+ (requester_name is not None and creator_str == requester_name) or
+ (creator_str == requester_id_str)
+ )
+
+ if not creator_matches_user and creator_str:
+ # Also check if creator is a username that resolves to the current user id.
+ try:
+ creator_id = self.get_user_info(user_name=creator_str, info='User_id')
+ creator_matches_user = creator_id == user_id
+ except Exception:
+ creator_matches_user = False
+
+ if not creator_matches_user:
+ return {
+ "success": False,
+ "message": "You do not have permission to run this strategy."
+ }
+
+ # Check if already running
+ instance_key = (user_id, strategy_id, effective_mode)
+ if instance_key in self.strategies.active_instances:
+ return {
+ "success": False,
+ "message": f"Strategy '{strategy_name}' is already running in {effective_mode} mode."
+ }
+
+ # Get the generated code from strategy_components
+ try:
+ import json
+ components = json.loads(strategy_row.get('strategy_components', '{}'))
+ # Key is 'generated_code' not 'code' - matches PythonGenerator output
+ generated_code = components.get('generated_code', '')
+ if not generated_code:
+ return {"success": False, "message": "Strategy has no generated code."}
+ except (json.JSONDecodeError, TypeError) as e:
+ return {"success": False, "message": f"Invalid strategy components: {e}"}
+
+ # Create unique instance ID
+ strategy_instance_id = str(uuid.uuid4())
+
+ # Create the strategy instance
+ try:
+ instance = self.strategies.create_strategy_instance(
+ mode=mode,
+ strategy_instance_id=strategy_instance_id,
+ strategy_id=strategy_id,
+ strategy_name=strategy_name,
+ user_id=user_id,
+ generated_code=generated_code,
+ initial_balance=initial_balance,
+ commission=commission,
+ price_provider=lambda symbol: self.exchanges.get_price(symbol),
+ )
+
+ # Store the active instance
+ self.strategies.active_instances[instance_key] = instance
+
+ logger.info(f"Started strategy '{strategy_name}' for user {user_id} in {mode} mode")
+
+ return {
+ "success": True,
+ "message": f"Strategy '{strategy_name}' started in {mode} mode.",
+ "strategy_id": strategy_id,
+ "strategy_name": strategy_name,
+ "instance_id": strategy_instance_id,
+ "mode": mode,
+ "actual_mode": effective_mode,
+ "initial_balance": initial_balance,
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to create strategy instance: {e}", exc_info=True)
+ return {"success": False, "message": f"Failed to start strategy: {str(e)}"}
+
+ def stop_strategy(
+ self,
+ user_id: int,
+ strategy_id: str,
+ mode: str,
+ ) -> dict:
+ """
+ Stop a running strategy.
+
+ :param user_id: User identifier.
+ :param strategy_id: Strategy tbl_key.
+ :param mode: Trading mode.
+ :return: Dictionary with success status.
+ """
+ from brokers import TradingMode
+
+ instance_key = (user_id, strategy_id, mode)
+ instance = self.strategies.active_instances.get(instance_key)
+
+ # Compatibility for live mode fallback.
+ if instance is None and mode == TradingMode.LIVE:
+ fallback_key = (user_id, strategy_id, TradingMode.PAPER)
+ instance = self.strategies.active_instances.get(fallback_key)
+ if instance is not None:
+ instance_key = fallback_key
+
+ if instance is None:
+ return {
+ "success": False,
+ "message": f"No running strategy found for this user/strategy/mode combination."
+ }
+
+ self.strategies.active_instances.pop(instance_key, None)
+ actual_mode = instance_key[2]
+ strategy_name = instance.strategy_name
+
+ # Get final stats if available
+ final_stats = {}
+ if hasattr(instance, 'broker') and hasattr(instance.broker, 'get_balance'):
+ final_stats['final_balance'] = instance.broker.get_balance()
+ final_stats['available_balance'] = instance.broker.get_available_balance()
+
+ if hasattr(instance, 'trade_history'):
+ final_stats['total_trades'] = len(instance.trade_history)
+
+ logger.info(f"Stopped strategy '{strategy_name}' for user {user_id} in {mode} mode")
+
+ return {
+ "success": True,
+ "message": f"Strategy '{strategy_name}' stopped.",
+ "strategy_id": strategy_id,
+ "strategy_name": strategy_name,
+ "mode": mode,
+ "actual_mode": actual_mode,
+ "final_stats": final_stats,
+ }
+
+ def get_strategy_status(
+ self,
+ user_id: int,
+ strategy_id: str = None,
+ mode: str = None,
+ ) -> dict:
+ """
+ Get the status of running strategies for a user.
+
+ :param user_id: User identifier.
+ :param strategy_id: Optional strategy ID to filter.
+ :param mode: Optional mode to filter.
+ :return: Dictionary with strategy statuses.
+ """
+ running_strategies = []
+
+ for (uid, sid, m), instance in self.strategies.active_instances.items():
+ if uid != user_id:
+ continue
+ if strategy_id and sid != strategy_id:
+ continue
+ if mode and m != mode:
+ continue
+
+ status = {
+ "strategy_id": sid,
+ "strategy_name": instance.strategy_name,
+ "mode": m,
+ "instance_id": instance.strategy_instance_id,
+ }
+
+ # Add broker stats if available
+ if hasattr(instance, 'broker'):
+ status['balance'] = instance.broker.get_balance()
+ status['available_balance'] = instance.broker.get_available_balance()
+
+ # Get positions
+ if hasattr(instance.broker, 'get_all_positions'):
+ positions = instance.broker.get_all_positions()
+ status['positions'] = [
+ {
+ 'symbol': p.symbol,
+ 'size': p.size,
+ 'entry_price': p.entry_price,
+ 'unrealized_pnl': p.unrealized_pnl,
+ }
+ for p in positions
+ ]
+
+ if hasattr(instance, 'trade_history'):
+ status['trade_count'] = len(instance.trade_history)
+
+ running_strategies.append(status)
+
+ return {
+ "success": True,
+ "running_strategies": running_strategies,
+ "count": len(running_strategies),
+ }
+
def delete_signal(self, signal_name: str) -> None:
"""
Deletes a signal from the signals instance and removes it from the configuration file.
@@ -929,6 +1187,84 @@ class BrighterTrades:
response = self.delete_backtest(msg_data)
return standard_reply("backtest_deleted", response)
+ if msg_type == 'run_strategy':
+ # Run a strategy in paper or live mode
+ required_fields = ['strategy_id', 'mode']
+ if not all(field in msg_data for field in required_fields):
+ return standard_reply("strategy_run_error", {"message": "Missing required fields (strategy_id, mode)."})
+
+ strategy_id = msg_data.get('strategy_id')
+ mode = msg_data.get('mode', 'paper').lower()
+
+ try:
+ # Parse numeric values safely inside try block
+ initial_balance = float(msg_data.get('initial_balance', 10000.0))
+ commission = float(msg_data.get('commission', 0.001))
+
+ # Validate numeric ranges
+ if initial_balance <= 0:
+ return standard_reply("strategy_run_error", {"message": "Initial balance must be positive."})
+ if commission < 0 or commission > 1:
+ return standard_reply("strategy_run_error", {"message": "Commission must be between 0 and 1."})
+
+ result = self.start_strategy(
+ user_id=user_id,
+ strategy_id=strategy_id,
+ mode=mode,
+ initial_balance=initial_balance,
+ commission=commission,
+ )
+
+ if result.get('success'):
+ # Add explicit warning if live mode was requested but fell back to paper
+ if mode == 'live' and result.get('actual_mode') == 'paper':
+ result['warning'] = "Live trading is not yet implemented. Running in paper trading mode for safety."
+ return standard_reply("strategy_started", result)
+ else:
+ return standard_reply("strategy_run_error", result)
+
+ except ValueError as e:
+ return standard_reply("strategy_run_error", {"message": f"Invalid numeric value: {str(e)}"})
+ except Exception as e:
+ logger.error(f"Error starting strategy: {e}", exc_info=True)
+ return standard_reply("strategy_run_error", {"message": f"Failed to start strategy: {str(e)}"})
+
+ if msg_type == 'stop_strategy':
+ strategy_id = msg_data.get('strategy_id')
+ mode = msg_data.get('mode', 'paper').lower()
+
+ if not strategy_id:
+ return standard_reply("strategy_stop_error", {"message": "Missing strategy_id."})
+
+ try:
+ result = self.stop_strategy(
+ user_id=user_id,
+ strategy_id=strategy_id,
+ mode=mode,
+ )
+ if result.get('success'):
+ return standard_reply("strategy_stopped", result)
+ else:
+ return standard_reply("strategy_stop_error", result)
+ except Exception as e:
+ logger.error(f"Error stopping strategy: {e}", exc_info=True)
+ return standard_reply("strategy_stop_error", {"message": f"Failed to stop strategy: {str(e)}"})
+
+ if msg_type == 'get_strategy_status':
+ strategy_id = msg_data.get('strategy_id')
+ mode = msg_data.get('mode')
+
+ try:
+ result = self.get_strategy_status(
+ user_id=user_id,
+ strategy_id=strategy_id,
+ mode=mode,
+ )
+ return standard_reply("strategy_status", result)
+ except Exception as e:
+ logger.error(f"Error getting strategy status: {e}", exc_info=True)
+ return standard_reply("strategy_status_error", {"message": f"Failed to get status: {str(e)}"})
+
if msg_type == 'reply':
# If the message is a reply log the response to the terminal.
print(f"\napp.py:Received reply: {msg_data}")
diff --git a/src/ExchangeInterface.py b/src/ExchangeInterface.py
index effb035..69eb908 100644
--- a/src/ExchangeInterface.py
+++ b/src/ExchangeInterface.py
@@ -348,28 +348,68 @@ class ExchangeInterface:
# For paper trading / backtesting, assume full fill
return trade.base_order_qty
- def get_trade_executed_price(self, trade) -> float:
+ @staticmethod
+ def _positive_price(value: Any) -> float | None:
+ """Normalize a value to a strictly positive float price."""
+ try:
+ price = float(value)
+ return price if price > 0 else None
+ except (TypeError, ValueError):
+ return None
+
+ def get_trade_executed_price(self, trade, fallback_price: float | None = None) -> float:
"""
Get the executed price of a trade order.
- For paper/backtest modes, returns the order price or current market price.
+ For paper/backtest modes, resolves a best-effort execution price and
+ never silently returns 0.0.
:param trade: The trade object.
+ :param fallback_price: Optional known-good price from caller context
+ (e.g. current tick price in Trades.update()).
:return: Executed price.
"""
- # For paper trading / backtesting, use order price if set
- if trade.order_price and trade.order_price > 0:
- return trade.order_price
+ # 1) Explicit order price (e.g. limit orders)
+ order_price = self._positive_price(getattr(trade, 'order_price', None))
+ if order_price is not None:
+ return order_price
- # For market orders (order_price=0), get current price
+ # 2) Caller-provided fallback (e.g. current candle/tick close)
+ resolved_fallback = self._positive_price(fallback_price)
+ if resolved_fallback is not None:
+ return resolved_fallback
+
+ # 3) Trade object fallbacks from known internal state
+ for attr_name in ('entry_price',):
+ attr_price = self._positive_price(getattr(trade, attr_name, None))
+ if attr_price is not None:
+ return attr_price
+
+ stats = getattr(trade, 'stats', None)
+ if isinstance(stats, dict):
+ for key in ('current_price', 'opening_price', 'settled_price'):
+ stat_price = self._positive_price(stats.get(key))
+ if stat_price is not None:
+ return stat_price
+
+ # 4) Exchange lookup as last resort
try:
- return self.get_price(trade.symbol)
- except Exception:
- # Fallback: if we can't get price, return entry price if available
- if hasattr(trade, 'entry_price') and trade.entry_price > 0:
- return trade.entry_price
- # Last resort: return 0 and let caller handle it
- return 0.0
+ market_price = self._positive_price(self.get_price(trade.symbol))
+ if market_price is not None:
+ return market_price
+ except Exception as e:
+ logger.warning(
+ "Failed to resolve executed price from exchange for trade %s (%s): %s",
+ getattr(trade, 'unique_id', 'unknown'),
+ getattr(trade, 'symbol', 'unknown'),
+ e
+ )
+
+ raise ValueError(
+ f"Unable to resolve executed price for trade "
+ f"{getattr(trade, 'unique_id', 'unknown')} "
+ f"({getattr(trade, 'symbol', 'unknown')})"
+ )
def get_user_balance(self, user_id: int) -> float:
"""
diff --git a/src/Strategies.py b/src/Strategies.py
index 69caaca..c5bafb9 100644
--- a/src/Strategies.py
+++ b/src/Strategies.py
@@ -64,6 +64,81 @@ class Strategies:
self.active_instances: dict[tuple[int, str, str], StrategyInstance] = {} # Key: (user_id, strategy_id, mode)
+ def update(self, candle_data: dict = None) -> list:
+ """
+ Update all active strategy instances with new price data.
+
+ Called on each candle/price tick to process strategy logic.
+
+ :param candle_data: Optional candle data dict with 'symbol', 'close', etc.
+ :return: List of all events from all strategies.
+ """
+ all_events = []
+
+ if not self.active_instances:
+ return all_events
+
+ # Create a list of keys to iterate (avoid dict modification during iteration)
+ instance_keys = list(self.active_instances.keys())
+
+ for instance_key in instance_keys:
+ user_id, strategy_id, mode = instance_key
+
+ try:
+ instance = self.active_instances.get(instance_key)
+ if instance is None:
+ continue
+
+ # Execute strategy tick (instance handles its own price update path).
+ events = instance.tick(candle_data)
+
+ # Tag events with instance info
+ for event in events:
+ event['user_id'] = user_id
+ event['strategy_id'] = strategy_id
+ event['mode'] = mode
+
+ all_events.extend(events)
+
+ # Handle exit condition
+ if instance.exit:
+ # Check if strategy has exited all positions
+ if hasattr(instance, 'broker'):
+ positions = instance.broker.get_all_positions()
+ if not positions:
+ logger.info(f"Strategy '{strategy_id}' has exited all positions. Removing from active.")
+ del self.active_instances[instance_key]
+ all_events.append({
+ 'type': 'strategy_exited',
+ 'user_id': user_id,
+ 'strategy_id': strategy_id,
+ 'mode': mode,
+ })
+
+ except Exception as e:
+ logger.error(f"Error updating strategy {instance_key}: {e}", exc_info=True)
+ all_events.append({
+ 'type': 'error',
+ 'user_id': user_id,
+ 'strategy_id': strategy_id,
+ 'mode': mode,
+ 'message': str(e),
+ })
+
+ return all_events
+
+ def get_active_count(self) -> int:
+ """Get the number of active strategy instances."""
+ return len(self.active_instances)
+
+ def get_active_for_user(self, user_id: int) -> list:
+ """Get all active strategies for a specific user."""
+ return [
+ {'strategy_id': sid, 'mode': mode, 'instance': inst}
+ for (uid, sid, mode), inst in self.active_instances.items()
+ if uid == user_id
+ ]
+
def create_strategy_instance(
self,
mode: str,
@@ -546,21 +621,25 @@ class Strategies:
traceback.print_exc()
return {"success": False, "message": f"Unexpected error: {str(e)}"}
- def update(self):
+ def update_db_active_strategies(self):
"""
- Loops through and executes all activated strategies.
+ Legacy method: Loops through and executes all DB-flagged active strategies.
+
+ Note: This is different from update() which handles the execution loop
+ for strategies started via run_strategy. This method reads from the
+ 'active' flag in the database, which is a different activation mechanism.
"""
try:
active_strategies = self.data_cache.get_rows_from_datacache('strategies',
[('active', True)],
include_tbl_key=True)
if active_strategies.empty:
- logger.info("No active strategies to execute.")
+ logger.info("No DB-active strategies to execute.")
return # No active strategies to execute
for _, strategy_data in active_strategies.iterrows():
self.execute_strategy(strategy_data)
except Exception as e:
- logger.error(f"Error updating strategies: {e}", exc_info=True)
+ logger.error(f"Error updating DB-active strategies: {e}", exc_info=True)
traceback.print_exc()
def update_stats(self, tbl_key: str, stats: dict) -> None:
@@ -603,4 +682,4 @@ class Strategies:
logger.info(f"Updated stats for strategy '{tbl_key}': {current_stats}")
except Exception as e:
- logger.error(f"Error updating stats for strategy '{tbl_key}': {e}", exc_info=True)
\ No newline at end of file
+ logger.error(f"Error updating stats for strategy '{tbl_key}': {e}", exc_info=True)
diff --git a/src/StrategyInstance.py b/src/StrategyInstance.py
index c7f0104..ba48036 100644
--- a/src/StrategyInstance.py
+++ b/src/StrategyInstance.py
@@ -296,6 +296,63 @@ class StrategyInstance:
self.exec_context[key] = value
logger.debug(f"Overridden exec_context key '{key}' with new value '{value}'.")
+ def tick(self, candle_data: dict = None) -> list:
+ """
+ Process one iteration of the strategy on a price tick.
+
+ This method is called by the execution loop when new price data arrives.
+ It updates prices, processes the strategy logic, and returns any events.
+
+ :param candle_data: Optional candle data dict with 'symbol', 'close', etc.
+ :return: List of events (orders, fills, errors, etc.)
+ """
+ events = []
+
+ # Skip if strategy is paused or exiting
+ if self.paused:
+ return [{'type': 'skipped', 'reason': 'paused'}]
+
+ if self.exit:
+ return [{'type': 'skipped', 'reason': 'exiting'}]
+
+ try:
+ # Update current candle data in exec context if provided
+ if candle_data:
+ self.exec_context['current_candle'] = candle_data
+ self.exec_context['current_price'] = candle_data.get('close')
+ self.exec_context['current_symbol'] = candle_data.get('symbol', 'BTC/USDT')
+
+ # Execute the strategy's next() method
+ result = self.execute()
+
+ if result.get('success'):
+ # Collect any events generated during execution
+ if '_events' in self.exec_context:
+ events.extend(self.exec_context['_events'])
+ self.exec_context['_events'] = []
+
+ events.append({
+ 'type': 'tick_complete',
+ 'strategy_id': self.strategy_id,
+ 'profit_loss': result.get('profit_loss', 0.0),
+ })
+ else:
+ events.append({
+ 'type': 'error',
+ 'strategy_id': self.strategy_id,
+ 'message': result.get('message', 'Unknown error'),
+ })
+
+ except Exception as e:
+ logger.error(f"Error in strategy tick: {e}", exc_info=True)
+ events.append({
+ 'type': 'error',
+ 'strategy_id': self.strategy_id,
+ 'message': str(e),
+ })
+
+ return events
+
def execute(self) -> dict[str, Any]:
"""
Executes the strategy's 'next()' method.
diff --git a/src/brokers/base_broker.py b/src/brokers/base_broker.py
index eff715a..cf9e29d 100644
--- a/src/brokers/base_broker.py
+++ b/src/brokers/base_broker.py
@@ -62,6 +62,29 @@ class Position:
unrealized_pnl: float
realized_pnl: float = 0.0
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert position to dictionary for persistence."""
+ return {
+ 'symbol': self.symbol,
+ 'size': self.size,
+ 'entry_price': self.entry_price,
+ 'current_price': self.current_price,
+ 'unrealized_pnl': self.unrealized_pnl,
+ 'realized_pnl': self.realized_pnl,
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'Position':
+ """Create Position from dictionary."""
+ return cls(
+ symbol=data['symbol'],
+ size=data['size'],
+ entry_price=data['entry_price'],
+ current_price=data['current_price'],
+ unrealized_pnl=data['unrealized_pnl'],
+ realized_pnl=data.get('realized_pnl', 0.0),
+ )
+
class BaseBroker(ABC):
"""
diff --git a/src/brokers/paper_broker.py b/src/brokers/paper_broker.py
index cccc16e..f0c45c3 100644
--- a/src/brokers/paper_broker.py
+++ b/src/brokers/paper_broker.py
@@ -9,7 +9,8 @@ slippage and commission.
import logging
from typing import Any, Dict, List, Optional, Callable
import uuid
-from datetime import datetime, timezone
+import json
+from datetime import datetime, timezone, timedelta
from .base_broker import (
BaseBroker, OrderResult, OrderSide, OrderType, OrderStatus, Position
@@ -65,6 +66,7 @@ class PaperOrder:
'filled_qty': self.filled_qty,
'filled_price': self.filled_price,
'commission': self.commission,
+ 'locked_funds': self.locked_funds,
'created_at': self.created_at.isoformat(),
'filled_at': self.filled_at.isoformat() if self.filled_at else None
}
@@ -400,7 +402,9 @@ class PaperBroker(BaseBroker):
'symbol': order.symbol,
'side': order.side.value,
'size': order.filled_qty,
+ 'filled_qty': order.filled_qty,
'price': order.filled_price,
+ 'filled_price': order.filled_price,
'commission': order.commission
})
@@ -421,3 +425,227 @@ class PaperBroker(BaseBroker):
self._trade_history.clear()
self._current_prices.clear()
logger.info(f"PaperBroker: Reset with balance {self.initial_balance}")
+
+ # ==================== State Persistence Methods ====================
+
+ def _ensure_persistence_cache(self) -> bool:
+ """
+ Ensure the persistence table/cache exists.
+ """
+ if not self._data_cache:
+ return False
+
+ try:
+ # Ensure backing DB table exists for datacache read/write methods.
+ if hasattr(self._data_cache, 'db') and hasattr(self._data_cache.db, 'execute_sql'):
+ self._data_cache.db.execute_sql(
+ 'CREATE TABLE IF NOT EXISTS "paper_broker_states" ('
+ 'id INTEGER PRIMARY KEY AUTOINCREMENT, '
+ 'tbl_key TEXT UNIQUE, '
+ 'strategy_instance_id TEXT UNIQUE, '
+ 'broker_state TEXT, '
+ 'updated_at TEXT)',
+ []
+ )
+
+ # Migration path for any older local table that was created without tbl_key.
+ try:
+ existing_df = self._data_cache.db.get_all_rows('paper_broker_states')
+ if 'tbl_key' not in existing_df.columns:
+ self._data_cache.db.execute_sql(
+ 'ALTER TABLE "paper_broker_states" ADD COLUMN tbl_key TEXT',
+ []
+ )
+ except Exception:
+ # If schema inspection fails, continue with current schema.
+ pass
+
+ # Keep tbl_key aligned with strategy_instance_id for DataCache overwrite semantics.
+ self._data_cache.db.execute_sql(
+ 'UPDATE "paper_broker_states" '
+ 'SET tbl_key = strategy_instance_id '
+ 'WHERE tbl_key IS NULL OR tbl_key = ""',
+ []
+ )
+
+ self._data_cache.create_cache(
+ name='paper_broker_states',
+ cache_type='table',
+ size_limit=5000,
+ eviction_policy='deny',
+ default_expiration=timedelta(days=7),
+ columns=['id', 'tbl_key', 'strategy_instance_id', 'broker_state', 'updated_at']
+ )
+ return True
+ except Exception as e:
+ logger.error(f"PaperBroker: Error ensuring persistence cache: {e}", exc_info=True)
+ return False
+
+ def to_state_dict(self) -> Dict[str, Any]:
+ """
+ Serialize broker state to a dictionary for persistence.
+
+ Returns dict containing all state needed to restore the broker.
+ """
+ # Serialize orders
+ orders_data = {}
+ for order_id, order in self._orders.items():
+ orders_data[order_id] = order.to_dict()
+
+ # Serialize positions
+ positions_data = {}
+ for symbol, position in self._positions.items():
+ positions_data[symbol] = position.to_dict()
+
+ return {
+ 'cash': self._cash,
+ 'locked_balance': self._locked_balance,
+ 'initial_balance': self.initial_balance,
+ 'commission': self.commission,
+ 'slippage': self.slippage,
+ 'orders': orders_data,
+ 'positions': positions_data,
+ 'trade_history': self._trade_history,
+ 'current_prices': self._current_prices,
+ }
+
+ def from_state_dict(self, state: Dict[str, Any]):
+ """
+ Restore broker state from a dictionary.
+
+ :param state: State dict from to_state_dict().
+ """
+ if not state:
+ return
+
+ # Restore balances
+ self._cash = state.get('cash', self.initial_balance)
+ self._locked_balance = state.get('locked_balance', 0.0)
+
+ # Restore orders
+ self._orders.clear()
+ orders_data = state.get('orders', {})
+ for order_id, order_dict in orders_data.items():
+ order = PaperOrder(
+ order_id=order_dict['order_id'],
+ symbol=order_dict['symbol'],
+ side=OrderSide(order_dict['side']),
+ order_type=OrderType(order_dict['order_type']),
+ size=order_dict['size'],
+ price=order_dict.get('price'),
+ stop_loss=order_dict.get('stop_loss'),
+ take_profit=order_dict.get('take_profit'),
+ )
+ order.status = OrderStatus(order_dict['status'])
+ order.filled_qty = order_dict.get('filled_qty', 0.0)
+ order.filled_price = order_dict.get('filled_price', 0.0)
+ order.commission = order_dict.get('commission', 0.0)
+ order.locked_funds = order_dict.get('locked_funds', 0.0)
+
+ if order_dict.get('created_at'):
+ order.created_at = datetime.fromisoformat(order_dict['created_at'])
+ if order_dict.get('filled_at'):
+ order.filled_at = datetime.fromisoformat(order_dict['filled_at'])
+
+ self._orders[order_id] = order
+
+ # Restore positions
+ self._positions.clear()
+ positions_data = state.get('positions', {})
+ for symbol, pos_dict in positions_data.items():
+ self._positions[symbol] = Position.from_dict(pos_dict)
+
+ # Restore trade history
+ self._trade_history = state.get('trade_history', [])
+
+ # Restore price cache
+ self._current_prices = state.get('current_prices', {})
+
+ logger.info(f"PaperBroker: State restored - cash: {self._cash:.2f}, "
+ f"positions: {len(self._positions)}, orders: {len(self._orders)}")
+
+ def save_state(self, strategy_instance_id: str) -> bool:
+ """
+ Save broker state to the data cache.
+
+ :param strategy_instance_id: Unique identifier for the strategy instance.
+ :return: True if saved successfully.
+ """
+ if not self._data_cache:
+ logger.warning("PaperBroker: No data cache available for persistence")
+ return False
+
+ try:
+ if not self._ensure_persistence_cache():
+ return False
+
+ state_dict = self.to_state_dict()
+ state_json = json.dumps(state_dict)
+
+ # Check if state already exists
+ existing = self._data_cache.get_rows_from_datacache(
+ cache_name='paper_broker_states',
+ filter_vals=[('strategy_instance_id', strategy_instance_id)]
+ )
+
+ columns = ('tbl_key', 'strategy_instance_id', 'broker_state', 'updated_at')
+ values = (strategy_instance_id, strategy_instance_id, state_json, datetime.now(timezone.utc).isoformat())
+
+ if existing.empty:
+ # Insert new state
+ self._data_cache.insert_row_into_datacache(
+ cache_name='paper_broker_states',
+ columns=columns,
+ values=values
+ )
+ else:
+ # Update existing state
+ self._data_cache.modify_datacache_item(
+ cache_name='paper_broker_states',
+ filter_vals=[('strategy_instance_id', strategy_instance_id)],
+ field_names=columns,
+ new_values=values,
+ overwrite='strategy_instance_id'
+ )
+
+ logger.debug(f"PaperBroker: State saved for {strategy_instance_id}")
+ return True
+
+ except Exception as e:
+ logger.error(f"PaperBroker: Error saving state: {e}", exc_info=True)
+ return False
+
+ def load_state(self, strategy_instance_id: str) -> bool:
+ """
+ Load broker state from the data cache.
+
+ :param strategy_instance_id: Unique identifier for the strategy instance.
+ :return: True if state was loaded successfully.
+ """
+ if not self._data_cache:
+ logger.warning("PaperBroker: No data cache available for persistence")
+ return False
+
+ try:
+ if not self._ensure_persistence_cache():
+ return False
+
+ existing = self._data_cache.get_rows_from_datacache(
+ cache_name='paper_broker_states',
+ filter_vals=[('strategy_instance_id', strategy_instance_id)]
+ )
+
+ if existing.empty:
+ logger.debug(f"PaperBroker: No saved state for {strategy_instance_id}")
+ return False
+
+ state_json = existing.iloc[0].get('broker_state', '{}')
+ state_dict = json.loads(state_json)
+ self.from_state_dict(state_dict)
+
+ logger.info(f"PaperBroker: State loaded for {strategy_instance_id}")
+ return True
+
+ except Exception as e:
+ logger.error(f"PaperBroker: Error loading state: {e}", exc_info=True)
+ return False
diff --git a/src/paper_strategy_instance.py b/src/paper_strategy_instance.py
index c096f7d..c7c0dcb 100644
--- a/src/paper_strategy_instance.py
+++ b/src/paper_strategy_instance.py
@@ -83,7 +83,13 @@ class PaperStrategyInstance(StrategyInstance):
self.exec_context['available_balance'] = self.available_balance
self.exec_context['available_strategy_balance'] = self.available_strategy_balance
- logger.info(f"PaperStrategyInstance created with balance: {initial_balance}")
+ # Try to load persisted broker state
+ if self.paper_broker.load_state(self.strategy_instance_id):
+ # Update balance attributes from restored broker state
+ self._update_balances()
+ logger.info(f"PaperStrategyInstance restored with balance: {self.current_balance}")
+ else:
+ logger.info(f"PaperStrategyInstance created with balance: {initial_balance}")
def trade_order(
self,
@@ -188,6 +194,82 @@ class PaperStrategyInstance(StrategyInstance):
# Update balance attributes
self._update_balances()
+ def tick(self, candle_data: dict = None) -> list:
+ """
+ Process one iteration of the paper trading strategy.
+
+ Overrides base tick to handle paper broker specifics.
+
+ :param candle_data: Optional candle data dict.
+ :return: List of events.
+ """
+ events = []
+
+ # Skip if paused or exiting
+ if self.paused:
+ return [{'type': 'skipped', 'reason': 'paused'}]
+
+ if self.exit:
+ return [{'type': 'skipped', 'reason': 'exiting'}]
+
+ try:
+ # Update prices first if candle data provided
+ if candle_data:
+ symbol = candle_data.get('symbol', 'BTC/USDT')
+ price = float(candle_data.get('close', 0))
+ if price > 0:
+ self.paper_broker.update_price(symbol, price)
+
+ # Process pending orders after price update
+ broker_events = self.paper_broker.update()
+ for event in broker_events:
+ if event['type'] == 'fill':
+ self.trade_history.append(event)
+ events.append({
+ 'type': 'order_filled',
+ 'order_id': event.get('order_id'),
+ 'symbol': event.get('symbol'),
+ 'filled_qty': event.get('filled_qty', event.get('size')),
+ 'filled_price': event.get('filled_price', event.get('price')),
+ })
+
+ # Update exec context with current data
+ self.exec_context['current_candle'] = candle_data
+ self.exec_context['current_price'] = price
+ self.exec_context['current_symbol'] = symbol
+
+ # Update balance attributes before execution
+ self._update_balances()
+
+ # Execute strategy logic
+ result = self.execute()
+
+ if result.get('success'):
+ events.append({
+ 'type': 'tick_complete',
+ 'strategy_id': self.strategy_id,
+ 'balance': self.current_balance,
+ 'available_balance': self.available_balance,
+ 'positions': len(self.paper_broker.get_all_positions()),
+ 'trades': len(self.trade_history),
+ })
+ else:
+ events.append({
+ 'type': 'error',
+ 'strategy_id': self.strategy_id,
+ 'message': result.get('message', 'Unknown error'),
+ })
+
+ except Exception as e:
+ logger.error(f"Error in paper strategy tick: {e}", exc_info=True)
+ events.append({
+ 'type': 'error',
+ 'strategy_id': self.strategy_id,
+ 'message': str(e),
+ })
+
+ return events
+
def _update_balances(self):
"""Update balance attributes from paper broker."""
self.current_balance = self.paper_broker.get_balance()
@@ -253,6 +335,8 @@ class PaperStrategyInstance(StrategyInstance):
def save_context(self):
"""Save strategy context including paper trading state."""
self._update_balances()
+ # Save paper broker state
+ self.paper_broker.save_state(self.strategy_instance_id)
super().save_context()
def notify_user(self, message: str):
diff --git a/src/static/Strategies.js b/src/static/Strategies.js
index a018c65..9ea6d5e 100644
--- a/src/static/Strategies.js
+++ b/src/static/Strategies.js
@@ -103,8 +103,6 @@ class StratUIManager {
if (this.targetEl) {
// Clear existing content
while (this.targetEl.firstChild) {
- // Log before removing the child
- console.log('Removing child:', this.targetEl.firstChild);
this.targetEl.removeChild(this.targetEl.firstChild);
}
@@ -117,25 +115,50 @@ class StratUIManager {
const strategyItem = document.createElement('div');
strategyItem.className = 'strategy-item';
+ // Check if strategy is running
+ const isRunning = UI.strats && UI.strats.isStrategyRunning(strat.tbl_key);
+ const runningInfo = isRunning ? UI.strats.getRunningInfo(strat.tbl_key) : null;
+
// Delete button
const deleteButton = document.createElement('button');
deleteButton.className = 'delete-button';
deleteButton.innerHTML = '✘';
- deleteButton.addEventListener('click', () => {
+ deleteButton.addEventListener('click', (e) => {
+ e.stopPropagation();
+ if (isRunning) {
+ alert('Cannot delete a running strategy. Stop it first.');
+ return;
+ }
console.log(`Delete button clicked for strategy: ${strat.name}`);
if (this.onDeleteStrategy) {
- this.onDeleteStrategy(strat.tbl_key); // Call the callback set by Strategies
+ this.onDeleteStrategy(strat.tbl_key);
} else {
console.error("Delete strategy callback is not set.");
}
});
strategyItem.appendChild(deleteButton);
- console.log('Delete button appended:', deleteButton);
+
+ // Run/Stop button
+ const runButton = document.createElement('button');
+ runButton.className = isRunning ? 'run-button running' : 'run-button';
+ runButton.innerHTML = isRunning ? '■' : '▶'; // Stop or Play icon
+ runButton.title = isRunning ? `Stop (${runningInfo.mode})` : 'Run strategy';
+ runButton.addEventListener('click', (e) => {
+ e.stopPropagation();
+ if (isRunning) {
+ UI.strats.stopStrategy(strat.tbl_key);
+ } else {
+ // Show mode selection in hover panel or use default
+ const modeSelect = document.getElementById(`mode-select-${strat.tbl_key}`);
+ const mode = modeSelect ? modeSelect.value : 'paper';
+ UI.strats.runStrategy(strat.tbl_key, mode);
+ }
+ });
+ strategyItem.appendChild(runButton);
// Strategy icon
const strategyIcon = document.createElement('div');
- strategyIcon.className = 'strategy-icon';
- // Open the form with strategy data when clicked
+ strategyIcon.className = isRunning ? 'strategy-icon running' : 'strategy-icon';
strategyIcon.addEventListener('click', () => {
console.log(`Strategy icon clicked for strategy: ${strat.name}`);
this.displayForm('edit', strat).catch(error => {
@@ -146,21 +169,62 @@ class StratUIManager {
// Strategy name
const strategyName = document.createElement('div');
strategyName.className = 'strategy-name';
- strategyName.textContent = strat.name || 'Unnamed Strategy'; // Fallback for undefined
+ strategyName.textContent = strat.name || 'Unnamed Strategy';
strategyIcon.appendChild(strategyName);
strategyItem.appendChild(strategyIcon);
- console.log('Strategy icon and name appended:', strategyIcon);
- // Strategy hover details
+ // Strategy hover details with run controls
const strategyHover = document.createElement('div');
strategyHover.className = 'strategy-hover';
- strategyHover.innerHTML = `${strat.name || 'Unnamed Strategy'}
Stats: ${JSON.stringify(strat.stats, null, 2)}`;
+
+ // Build hover content
+ let hoverHtml = `${strat.name || 'Unnamed Strategy'}`;
+
+ // Show running status if applicable
+ if (isRunning) {
+ let statusHtml = `
+