From 4072a9d5f59555c5d5c20ccd6ca6950a6020ee6e Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 15 Nov 2024 18:08:32 -0400 Subject: [PATCH] The test seem to run without errors but are still process blocking. --- markdown/backtesting.md | 112 ++++++ src/StrategyInstance.py | 5 +- src/backtest_strategy_instance.py | 231 ++++++++++++ src/backtesting.py | 602 ++++++------------------------ src/mapped_strategy.py | 142 +++++++ 5 files changed, 598 insertions(+), 494 deletions(-) create mode 100644 markdown/backtesting.md create mode 100644 src/backtest_strategy_instance.py create mode 100644 src/mapped_strategy.py diff --git a/markdown/backtesting.md b/markdown/backtesting.md new file mode 100644 index 0000000..c9c6da0 --- /dev/null +++ b/markdown/backtesting.md @@ -0,0 +1,112 @@ +```plantuml + +@startuml +!define RECTANGLE class + +actor User as U + +participant "User Interface" as UI +participant "Backtester Class" as Backtester +participant "DataCache_v3" as DataCache +participant "Strategies" as Strategies +participant "Indicators" as Indicators +participant "StrategyInstance" as StrategyInstance +participant "Backtrader (Cerebro)" as Cerebro +participant "Custom Analyzers" as Analyzers +participant "EquityCurveAnalyzer" as EquityAnalyzer +participant "TradeAnalyzer" as TradeAnalyzer +participant "Pandas DataFeed" as DataFeed +participant "Logging" as Logging +participant "SocketIO" as SocketIO + +== Backtest Submission == +U -> UI: Submit Backtest Request\n(strategy details, parameters) +UI -> Backtester: initiate_backtest(strategy_details, parameters) +activate Backtester + +== Caching Backtest == +Backtester -> DataCache: cache_backtest(user_name, backtest_name, backtest_data, strategy_instance_id) +activate DataCache +DataCache --> Backtester: Confirmation +deactivate DataCache + +== Preparing Strategy Instance == +Backtester -> Strategies: retrieve_strategy(strategy_id) +activate Strategies +Strategies --> Backtester: strategy_class +deactivate Strategies + +Backtester -> Indicators: get_precomputed_indicators(strategy_id) +activate Indicators +Indicators --> Backtester: precomputed_indicators +deactivate Indicators + +Backtester -> Backtester: map_user_strategy(user_strategy, precomputed_indicators, mode) +Backtester -> StrategyInstance: __init__(strategy_instance_id, strategy_id, strategy_name, user_id, generated_code, data_cache, indicators, trades) +activate StrategyInstance +StrategyInstance --> Backtester: Initialized +deactivate StrategyInstance + +== Preparing Data Feed == +Backtester -> DataCache: prepare_data_feed(data_parameters) +activate DataCache +DataCache --> Backtester: data_feed +deactivate DataCache + +Backtester -> Backtester: add_custom_handlers() +Backtester -> Backtester: precompute_indicators(precomputed_indicators) +Backtester -> Backtester: setup_strategy_instance(strategy_instance) + +== Running Backtest == +Backtester -> Cerebro: setup_cerebro(data_feed, strategy_class, parameters) +activate Cerebro + +Cerebro -> Cerebro: addstrategy(strategy_class, **kwargs) +Cerebro -> Cerebro: adddata(data_feed) +Cerebro -> Cerebro: setcash(initial_capital) +Cerebro -> Cerebro: setcommission(commission) +Cerebro -> Analyzers: add_analyzer(EquityCurveAnalyzer, _name='equity_curve') +Cerebro -> Analyzers: add_analyzer(TradeAnalyzer, _name='trade_analyzer') +Cerebro -> Logging: configure_logging() + +Cerebro -> Cerebro: run() +activate Cerebro + +Cerebro -> EquityAnalyzer: initialize() +activate EquityAnalyzer +Cerebro -> TradeAnalyzer: initialize() +activate TradeAnalyzer + +Cerebro -> StrategyInstance: attach_backtrader_strategy(strategy) +Cerebro -> StrategyInstance: execute() +activate StrategyInstance + +StrategyInstance -> Cerebro: next() +StrategyInstance -> Logging: log("Strategy execution step") +StrategyInstance --> Cerebro: Step completed + +Cerebro -> EquityAnalyzer: record_equity_curve() +Cerebro -> TradeAnalyzer: analyze_trades() + +Cerebro --> Backtester: backtest_results +deactivate Cerebro +deactivate Analyzers + +== Processing Results == +Backtester -> Backtester: calculate_returns(equity_curve) +Backtester -> Backtester: analyze_trades(trades) +Backtester -> Backtester: compute_statistics(total_return, sharpe_ratio, max_drawdown, win_loss_ratio) +Backtester -> Backtester: update_stats(strategy_id, stats) +Backtester -> DataCache: store_backtest_results(user_name, backtest_name, results) +activate DataCache +DataCache --> Backtester: Confirmation +deactivate DataCache + +== Emitting Results == +Backtester -> SocketIO: emit('backtest_results', data, room=socket_conn_id) +activate SocketIO +SocketIO --> U: Receive Backtest Results +deactivate SocketIO + +deactivate Backtester +@enduml diff --git a/src/StrategyInstance.py b/src/StrategyInstance.py index 1e95142..a49bbe5 100644 --- a/src/StrategyInstance.py +++ b/src/StrategyInstance.py @@ -1,6 +1,7 @@ import logging import pandas as pd +from sqlalchemy.util import symbol from DataCache_v3 import DataCache from indicators import Indicators @@ -391,7 +392,6 @@ class StrategyInstance: self, trade_type: str, size: float, - symbol: str, order_type: str, source: dict = None, tif: str = 'GTC', @@ -406,6 +406,7 @@ class StrategyInstance: """ Unified trade order handler for executing buy and sell orders. """ + symbol = source['symbol'] if trade_type == 'buy': logger.info(f"Executing BUY order: Size={size}, Symbol={symbol}, Order Type={order_type}") # Implement buy order logic here @@ -475,6 +476,8 @@ class StrategyInstance: :param output_field: Specific field of the indicator. :return: Indicator value. """ + logger.debug(f"StrategyInstance is Retrieving indicator '{indicator_name}' from Indicators for user '{self.user_id}'.") + try: user_indicators = self.indicators.get_indicator_list(user_id=self.user_id) indicator = user_indicators.get(indicator_name) diff --git a/src/backtest_strategy_instance.py b/src/backtest_strategy_instance.py new file mode 100644 index 0000000..f1c5b13 --- /dev/null +++ b/src/backtest_strategy_instance.py @@ -0,0 +1,231 @@ +# backtest_strategy_instance.py + +import logging +import pandas as pd +import datetime as dt +import backtrader as bt +from StrategyInstance import StrategyInstance + +logger = logging.getLogger(__name__) + + +class BacktestStrategyInstance(StrategyInstance): + """ + Extends StrategyInstance with custom methods for backtesting. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # 1. Override trade_order + def trade_order( + self, + trade_type: str, + size: float, + order_type: str, + source: dict = None, + tif: str = 'GTC', + stop_loss: dict = None, + trailing_stop: dict = None, + take_profit: dict = None, + limit: dict = None, + trailing_limit: dict = None, + target_market: dict = None, + name_order: dict = None + ): + """ + Custom trade_order method for backtesting. + Executes trades within the Backtrader environment. + """ + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return + + # Validate and extract symbol + symbol = source.get('market') if source and 'market' in source else None + if not symbol: + logger.error("Symbol not provided in source. Order not executed.") + return + + # Common logic for BUY and SELL + price = self.backtrader_strategy.data.close[0] + stop_loss_price = stop_loss.get('value') if stop_loss else None + take_profit_price = take_profit.get('value') if take_profit else None + + # Determine trade execution type + if trade_type.lower() == 'buy': + bracket_orders = self.backtrader_strategy.buy_bracket( + size=size, + price=price, + stopprice=stop_loss_price, + limitprice=take_profit_price, + exectype=bt.Order.Market + ) + action = "BUY" + elif trade_type.lower() == 'sell': + bracket_orders = self.backtrader_strategy.sell_bracket( + size=size, + price=price, + stopprice=stop_loss_price, + limitprice=take_profit_price, + exectype=bt.Order.Market + ) + action = "SELL" + else: + logger.error(f"Invalid trade_type '{trade_type}'. Order not executed.") + return + + # Store and notify + if bracket_orders: + self.backtrader_strategy.orders.extend(bracket_orders) + message = f"{action} order executed for {size} {symbol} at {order_type} price." + self.notify_user(message) + logger.info(message) + + # 2. Override process_indicator + def process_indicator(self, indicator_name: str, output_field: str): + """ + Retrieves precomputed indicator values for backtesting. + """ + logger.debug(f"Backtester is Retrieving indicator '{indicator_name}' from precomputed data.") + logger.debug(f'here is the precomputed_indicators: {self.backtrader_strategy.precomputed_indicators}') + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return None + + df = self.backtrader_strategy.precomputed_indicators.get(indicator_name) + if df is None: + logger.error(f"Indicator '{indicator_name}' not found.") + return None + + idx = self.backtrader_strategy.indicator_pointers.get(indicator_name, 0) + if idx >= len(df): + logger.warning(f"No more data for indicator '{indicator_name}' at index {idx}.") + return None + + value = df.iloc[idx].get(output_field) + if pd.isna(value): + logger.warning(f"NaN value encountered for indicator '{indicator_name}' at index {idx}.") + return None + + return value + + # 3. Override get_current_price + def get_current_price(self, timeframe: str = '1h', exchange: str = 'binance', + symbol: str = 'BTC/USD') -> float: + """ + Retrieves the current market price from Backtrader's data feed. + """ + if self.backtrader_strategy: + return self.backtrader_strategy.data.close[0] + logger.error("Backtrader strategy is not set.") + return 0.0 + + # 4. Override get_last_candle + def get_last_candle(self, candle_part: str, timeframe: str, exchange: str, symbol: str): + """ + Retrieves the specified part of the last candle from Backtrader's data feed. + """ + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return None + + candle_map = { + 'open': self.backtrader_strategy.data.open[0], + 'high': self.backtrader_strategy.data.high[0], + 'low': self.backtrader_strategy.data.low[0], + 'close': self.backtrader_strategy.data.close[0], + 'volume': self.backtrader_strategy.data.volume[0], + } + value = candle_map.get(candle_part.lower()) + if value is None: + logger.error(f"Invalid candle_part '{candle_part}'. Must be one of {list(candle_map.keys())}.") + else: + logger.debug( + f"Retrieved '{candle_part}' from last candle for {symbol} on {exchange} ({timeframe}): {value}" + ) + return value + + # 5. Override get_filled_orders + def get_filled_orders(self) -> int: + """ + Retrieves the number of filled orders from Backtrader's broker. + """ + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return 0 + try: + filled_orders = len(self.backtrader_strategy.broker.filled) + logger.debug(f"Number of filled orders: {filled_orders}") + return filled_orders + except Exception as e: + logger.error(f"Error retrieving filled orders: {e}", exc_info=True) + return 0 + + # 6. Override get_available_balance + def get_available_balance(self) -> float: + """ + Retrieves the available balance from Backtrader's broker. + """ + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return 0.0 + try: + available_balance = self.backtrader_strategy.broker.getcash() + logger.debug(f"Available balance: {available_balance}") + return available_balance + except Exception as e: + logger.error(f"Error retrieving available balance: {e}", exc_info=True) + return 0.0 + + # 7. Override get_current_balance + def get_current_balance(self) -> float: + """ + Retrieves the current balance from Backtrader's broker. + """ + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return 0.0 + try: + balance = self.backtrader_strategy.broker.getvalue() + logger.debug(f"Current balance retrieved: {balance}.") + return balance + except Exception as e: + logger.error(f"Error retrieving current balance: {e}", exc_info=True) + return 0.0 + + # 8. Override get_filled_orders_details (Optional but Recommended) + def get_filled_orders_details(self) -> list: + """ + Retrieves detailed information about filled orders. + """ + if self.backtrader_strategy is None: + logger.error("Backtrader strategy is not set in StrategyInstance.") + return [] + try: + filled_orders = [] + for order in self.backtrader_strategy.broker.filled: + order_info = { + 'ref': order.ref, + 'size': order.size, + 'price': order.executed.price, + 'value': order.executed.value, + 'commission': order.executed.comm, + 'status': order.status, + 'created_at': dt.datetime.fromtimestamp(order.created.dt.timestamp()) if hasattr(order, + 'created') else None + } + filled_orders.append(order_info) + logger.debug(f"Filled orders details: {filled_orders}") + return filled_orders + except Exception as e: + logger.error(f"Error retrieving filled orders details: {e}", exc_info=True) + return [] + + # 9. Override notify_user + def notify_user(self, message: str): + """ + Suppresses user notifications and instead logs them. + :param message: Notification message. + """ + logger.debug(f"Backtest notification: {message}") diff --git a/src/backtesting.py b/src/backtesting.py index 124f43b..54a311d 100644 --- a/src/backtesting.py +++ b/src/backtesting.py @@ -1,7 +1,7 @@ +# backtesting.py + import logging import time -import traceback -import types import uuid import backtrader as bt @@ -10,6 +10,8 @@ from DataCache_v3 import DataCache, RowBasedCache, TableBasedCache from Strategies import Strategies from StrategyInstance import StrategyInstance from indicators import Indicators +from backtest_strategy_instance import BacktestStrategyInstance +from mapped_strategy import MappedStrategy import numpy as np import pandas as pd import signal @@ -22,6 +24,7 @@ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(messag handler.setFormatter(formatter) logger.addHandler(handler) + # Custom EquityCurveAnalyzer class EquityCurveAnalyzer(bt.Analyzer): def __init__(self): @@ -66,17 +69,14 @@ class Backtester: signal.signal(signal.SIGINT, self.shutdown_handler) signal.signal(signal.SIGTERM, self.shutdown_handler) - def cache_backtest(self, user_name: str, backtest_name: str, backtest_data: dict, strategy_instance_id: str): + def cache_backtest(self, backtest_key: str, backtest_data: dict, strategy_instance_id: str): """ Cache the backtest data for a user. If the backtest already exists, update it; otherwise, insert a new entry. - - :param user_name: Name of the user. - :param backtest_name: Name of the backtest. + :param backtest_key: A unique identifier for the backtest. :param backtest_data: Dictionary containing backtest parameters. + :param strategy_instance_id: The ID of the strategy instance. """ - cache_key = f"backtest:{user_name}:{backtest_name}" - # Define columns and corresponding values (excluding 'tbl_key') columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results', 'strategy_instance_id') values = ( @@ -93,7 +93,7 @@ class Backtester: # Check if the backtest already exists existing_backtest = self.data_cache.get_rows_from_cache( cache_name='tests', - filter_vals=[('tbl_key', cache_key)] + filter_vals=[('tbl_key', backtest_key)] ) if existing_backtest.empty: @@ -102,24 +102,23 @@ class Backtester: cache_name='tests', columns=columns, values=values, - key=cache_key + key=backtest_key ) - logger.debug(f"Inserted new backtest entry '{cache_key}'.") + logger.debug(f"Inserted new backtest entry '{backtest_key}'.") else: # Update existing backtest entry (e.g., reset 'results' if needed) # Here, we assume you might want to reset 'results' when re-running self.data_cache.modify_datacache_item( cache_name='tests', - filter_vals=[('tbl_key', cache_key)], + filter_vals=[('tbl_key', backtest_key)], field_names=('results',), new_values=(None,), # Reset results overwrite='tbl_key' # Ensures uniqueness based on 'tbl_key' ) - logger.debug(f"Updated existing backtest entry '{cache_key}'. Reset 'results'.") + logger.debug(f"Updated existing backtest entry '{backtest_key}'. Reset 'results'.") except Exception as e: - logger.error(f"Error in cache_backtest for '{cache_key}': {e}", exc_info=True) - # Depending on your application, you might want to raise the exception - # raise e + logger.error(f"Error in cache_backtest for '{backtest_key}': {e}", exc_info=True) + def cleanup_backtest(self, backtest_key: str, strategy_instance_id: str) -> None: """ @@ -148,16 +147,24 @@ class Backtester: except Exception as e: logger.error(f"Error during cleanup of backtest '{backtest_key}': {e}", exc_info=True) + def validate_strategy_components(self, user_id: str, strategy_name: str, user_name: str) -> dict: + """ + Retrieves and validates the components of a user-defined strategy. + Raises a ValueError if required components are missing or incorrectly formatted. + """ + try: + user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name) + except ValueError: + logger.error(f"Invalid user_id '{user_id}'. Must be an integer.") + raise ValueError(f"Invalid user_id '{user_id}'. Must be an integer.") - def map_user_strategy(self, user_strategy: dict, precomputed_indicators: dict[str, pd.DataFrame], - mode: str = 'testing', socketio=None, socket_conn_id=None, data_length=None) -> any: - """ - Maps user strategy details into a Backtrader-compatible strategy class. - """ - # Extract the generated code and indicators from the strategy components - strategy_components = user_strategy['strategy_components'] - generated_code = strategy_components['generated_code'] - indicators_used = strategy_components['indicators'] + if not user_strategy: + logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.") + raise ValueError(f"Strategy '{strategy_name}' not found for user '{user_name}'.") + + strategy_components = user_strategy.get('strategy_components', {}) + generated_code = strategy_components.get('generated_code') + indicators_used = strategy_components.get('indicators') # Validate extracted data if not generated_code: @@ -168,378 +175,8 @@ class Backtester: logger.error("'indicators_used' should be a list.") raise ValueError("'indicators_used' should be a list.") - logger.info(f"Mapping strategy '{user_strategy.get('strategy_name', 'Unnamed')}' with mode '{mode}'.") + return user_strategy - # Define the strategy class dynamically - class MappedStrategy(bt.Strategy): - params = ( - ('mode', mode), - ('strategy_instance', None), # Will be set during instantiation - ('socketio', socketio), - ('socket_conn_id', socket_conn_id), - ('data_length', data_length) - ) - - def __init__(self): - super().__init__() - self.strategy_instance: StrategyInstance = self.p.strategy_instance - logger.debug(f"StrategyInstance '{self.strategy_instance.strategy_instance_id}' attached to MappedStrategy.") - - # Establish backreference - self.strategy_instance.backtrader_strategy = self - - self.precomputed_indicators = precomputed_indicators - self.indicator_pointers = {} - self.indicator_names = list(precomputed_indicators.keys()) - self.current_step = 0 - - # Initialize pointers for each indicator - for name in self.indicator_names: - self.indicator_pointers[name] = 0 # Start at the first row - - # Initialize an empty list to store orders - self.orders = [] - self.trade_list = [] - - # Initialize any other needed variables - self.starting_balance = self.broker.getvalue() - self.current_step = 0 - self.last_progress = 0 # Initialize last_progress - - def notify_order(self, order): - if order.status in [order.Submitted, order.Accepted]: - # Order has been submitted/accepted by broker - nothing to do - return - - if order.status in [order.Completed]: - if order.isbuy(): - self.log(f"BUY EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}") - elif order.issell(): - self.log(f"SELL EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}") - self.bar_executed = len(self) - elif order.status in [order.Canceled, order.Margin, order.Rejected]: - self.log('Order Canceled/Margin/Rejected') - - # Remove the order from the list - if order in self.orders: - self.orders.remove(order) - - def notify_trade(self, trade): - if not trade.isclosed: - return - - self.log(f"TRADE CLOSED, GROSS P/L: {trade.pnl}, NET P/L: {trade.pnlcomm}") - - # Convert datetime objects to ISO-formatted strings - open_datetime = bt.num2date(trade.dtopen).isoformat() if trade.dtopen else None - close_datetime = bt.num2date(trade.dtclose).isoformat() if trade.dtclose else None - - # Store the trade details for later use - trade_info = { - 'ref': trade.ref, - 'size': trade.size, - 'price': trade.price, - 'pnl': trade.pnl, - 'pnlcomm': trade.pnlcomm, - 'open_datetime': open_datetime, - 'close_datetime': close_datetime - } - self.trade_list.append(trade_info) - - def log(self, txt, dt=None): - """ Logging function for this strategy""" - dt = dt or self.datas[0].datetime.datetime(0) - logger.info(f"{dt.isoformat()} - {txt}") - - def next(self): - self.current_step += 1 - # Execute the strategy logic - try: - execution_result = self.strategy_instance.execute() - if not execution_result.get('success', False): - error_msg = execution_result.get('message', 'Unknown error during strategy execution.') - logger.error(f"Strategy execution failed: {error_msg}") - self.stop() - except Exception as e: - logger.error(f"Error in strategy execution: {e}") - - # Calculate progress - progress = (self.current_step / self.p.data_length) * 100 - progress = min(int(progress), 100) # Ensure progress doesn't exceed 100% - - # Emit progress only if it has increased by at least 1% - if progress > self.last_progress: - self.p.socketio.emit( - 'message', - {'reply': 'progress', 'data': {'progress': progress}}, - room=self.p.socket_conn_id - ) - self.last_progress = progress - - return MappedStrategy - - # Add custom handlers to the StrategyInstance - def add_custom_handlers(self, strategy_instance: StrategyInstance) -> StrategyInstance: - """ - Define custom methods to be injected into exec_context. - :param strategy_instance: The strategy instance to inject the custom handlers into. - :return: The modified strategy instance. - """ - - # 1. Override trade_order - def trade_order( - trade_type: str, - size: float, - order_type: str, - source: dict = None, - tif: str = 'GTC', - stop_loss: dict = None, - trailing_stop: dict = None, - take_profit: dict = None, - limit: dict = None, - trailing_limit: dict = None, - target_market: dict = None, - name_order: dict = None - ): - """ - Custom trade_order method for backtesting. - Executes trades within the Backtrader environment. - """ - # Validate and extract 'symbol' from 'source' - if source and 'market' in source: - symbol = source['market'] - logger.debug(f"Extracted symbol '{symbol}' from source.") - else: - logger.error("Symbol not provided in source. Order not executed.") - return # Abort the order execution - - price = strategy_instance.backtrader_strategy.data.close[0] - - if trade_type.lower() == 'buy': - logger.info(f"Executing BUY order: Size={size}, Symbol={symbol}, Order Type={order_type}") - - # Prepare bracket order parameters - stop_loss_price = stop_loss.get('value') if stop_loss else None - take_profit_price = take_profit.get('value') if take_profit else None - - # Create bracket order and store the orders - bracket_orders = strategy_instance.backtrader_strategy.buy_bracket( - size=size, - price=price, - stopprice=stop_loss_price, - limitprice=take_profit_price, - exectype=bt.Order.Market - ) - elif trade_type.lower() == 'sell': - logger.info(f"Executing SELL order: Size={size}, Symbol={symbol}, Order Type={order_type}") - - # Prepare bracket order parameters - stop_loss_price = stop_loss.get('value') if stop_loss else None - take_profit_price = take_profit.get('value') if take_profit else None - - # Create bracket order and store the orders - bracket_orders = strategy_instance.backtrader_strategy.sell_bracket( - size=size, - price=price, - stopprice=stop_loss_price, - limitprice=take_profit_price, - exectype=bt.Order.Market - ) - else: - logger.error(f"Invalid trade_type '{trade_type}'. Order not executed.") - return # Abort the order execution - - # Store the orders for tracking - strategy_instance.backtrader_strategy.orders.extend(bracket_orders) - - # Notify user about the trade execution - strategy_instance.notify_user( - f"{trade_type.capitalize()} order executed for {size} {symbol} at {order_type} price." - ) - logger.debug(f"{trade_type.capitalize()} order executed for {size} {symbol} at {order_type} price.") - - # Override the trade_order method - strategy_instance.override_exec_context('trade_order', trade_order) - - # 2. Override process_indicator - def process_indicator(indicator_name, output_field): - """ - Custom process_indicator method for backtesting. - :param indicator_name: Name of the indicator. - :param output_field: Specific field to retrieve from the indicator. - :return: The value of the specified indicator field at the current step. - """ - # Access precomputed_indicators via backtrader_strategy - if strategy_instance.backtrader_strategy is None: - logger.error("Backtrader strategy is not set in StrategyInstance.") - return None - - df = strategy_instance.backtrader_strategy.precomputed_indicators.get(indicator_name) - if df is None: - logger.error(f"Indicator '{indicator_name}' not found in precomputed indicators.") - return None - - # Access indicator_pointers via backtrader_strategy - idx = strategy_instance.backtrader_strategy.indicator_pointers.get(indicator_name, 0) - - if idx >= len(df): - logger.warning(f"No more data for indicator '{indicator_name}' at index {idx}.") - return None # No more data - - # Get the specific output value - if output_field in df.columns: - value = df.iloc[idx][output_field] - if pd.isna(value): - logger.warning(f"NaN value encountered for indicator '{indicator_name}' at index {idx}.") - return None # Handle NaN values - return value - else: - logger.error(f"Output field '{output_field}' not found in indicator '{indicator_name}'.") - return None # Output field not found - - # Override the process_indicator method - strategy_instance.override_exec_context('process_indicator', process_indicator) - - # 3. Override get_current_price - def get_current_price(timeframe: str = '1h', exchange: str = 'binance', - symbol: str = 'BTC/USD') -> float | None: - """ - Retrieves the current market price from Backtrader's data feed. - """ - try: - # Access the current close price from Backtrader's data - current_price = strategy_instance.backtrader_strategy.data.close[0] - logger.debug(f"Retrieved current price for {symbol} on {exchange} ({timeframe}): {current_price}") - return current_price - except Exception as e: - logger.error(f"Error retrieving current price for {symbol} on {exchange} ({timeframe}): {e}", - exc_info=True) - return None - - # Override the get_current_price method - strategy_instance.override_exec_context('get_current_price', get_current_price) - - # 4. Override get_last_candle - def get_last_candle(candle_part: str, timeframe: str, exchange: str, symbol: str): - """ - Retrieves the specified part of the last candle from Backtrader's data feed. - """ - try: - # Map candle_part to Backtrader's data attributes - candle_map = { - 'open': strategy_instance.backtrader_strategy.data.open[0], - 'high': strategy_instance.backtrader_strategy.data.high[0], - 'low': strategy_instance.backtrader_strategy.data.low[0], - 'close': strategy_instance.backtrader_strategy.data.close[0], - 'volume': strategy_instance.backtrader_strategy.data.volume[0], - } - value = candle_map.get(candle_part.lower()) - if value is None: - logger.error(f"Invalid candle_part '{candle_part}'. Must be one of {list(candle_map.keys())}.") - else: - logger.debug( - f"Retrieved '{candle_part}' from last candle for {symbol} on {exchange} ({timeframe}): {value}") - return value - except Exception as e: - logger.error( - f"Error retrieving last candle '{candle_part}' for {symbol} on {exchange} ({timeframe}): {e}", - exc_info=True) - return None - - # Override the get_last_candle method - strategy_instance.override_exec_context('get_last_candle', get_last_candle) - - # 5. Override get_filled_orders - def get_filled_orders() -> int: - """ - Retrieves the number of filled orders from Backtrader's broker. - """ - try: - # Access Backtrader's broker's filled orders - filled_orders = len(strategy_instance.backtrader_strategy.broker.filled) - logger.debug(f"Number of filled orders: {filled_orders}") - return filled_orders - except Exception as e: - logger.error(f"Error retrieving filled orders: {e}", exc_info=True) - return 0 - - # Override the get_filled_orders method - strategy_instance.override_exec_context('get_filled_orders', get_filled_orders) - - # 6. Override get_available_balance - def get_available_balance() -> float: - """ - Retrieves the available balance from Backtrader's broker. - """ - try: - available_balance = strategy_instance.backtrader_strategy.broker.getcash() - logger.debug(f"Available balance: {available_balance}") - return available_balance - except Exception as e: - logger.error(f"Error retrieving available balance: {e}", exc_info=True) - return 0.0 - - # Override the get_available_balance method - strategy_instance.override_exec_context('get_available_balance', get_available_balance) - - # 7. Override get_current_balance - def get_current_balance() -> float: - """ - Retrieves the current balance from Backtrader's broker. - - :return: Current balance. - """ - try: - # Access the total portfolio value from Backtrader's broker - balance = strategy_instance.backtrader_strategy.broker.getvalue() - logger.debug(f"Current balance retrieved: {balance}.") - return balance - except Exception as e: - logger.error(f"Error retrieving current balance: {e}", exc_info=True) - return 0.0 - - # Override the get_current_balance method - strategy_instance.override_exec_context('get_current_balance', get_current_balance) - - # 8. Override get_filled_orders_details (Optional but Recommended) - def get_filled_orders_details() -> list: - """ - Retrieves detailed information about filled orders. - """ - try: - filled_orders = [] - for order in strategy_instance.backtrader_strategy.broker.filled: - order_info = { - 'ref': order.ref, - 'size': order.size, - 'price': order.executed.price, - 'value': order.executed.value, - 'commission': order.executed.comm, - 'status': order.status, - 'created_at': dt.datetime.fromtimestamp(order.created.dt.timestamp()) - } - filled_orders.append(order_info) - logger.debug(f"Filled orders details: {filled_orders}") - return filled_orders - except Exception as e: - logger.error(f"Error retrieving filled orders details: {e}", exc_info=True) - return [] - - # Override the get_filled_orders_details method - strategy_instance.override_exec_context('get_filled_orders_details', get_filled_orders_details) - - def notify_user(self, message: str): - """ - Suppresses user notifications and instead logs them. - :param message: Notification message. - """ - logger.debug(f"User notification during backtest for user ID '{self.user_id}': {message}") - - # Bind the overridden method to the instance - strategy_instance.notify_user = types.MethodType(notify_user, strategy_instance) - - # Return the modified strategy_instance - return strategy_instance def prepare_data_feed(self, start_date: str, source, user_name: str) -> pd.DataFrame: """ @@ -689,14 +326,14 @@ class Backtester: return data_feed, precomputed_indicators def run_backtest(self, strategy_class, data_feed: pd.DataFrame, msg_data: dict, user_name: str, - callback, socket_conn_id: str, strategy_instance: StrategyInstance): + socket_conn_id: str, strategy_instance: BacktestStrategyInstance, backtest_name: str, + user_id: str, backtest_key: str, strategy_name: str, precomputed_indicators: dict): """ Runs a backtest using Backtrader and uses Flask-SocketIO's background tasks. Sends progress updates to the client via WebSocket. """ def execute_backtest(): - nonlocal data_feed try: # **Convert 'time' to 'datetime' if necessary** if 'time' in data_feed.columns: @@ -715,8 +352,15 @@ class Backtester: # Assign cerebro to strategy_instance for potential use in custom methods strategy_instance.cerebro = cerebro - # Add the mapped strategy to the backtest, including strategy_instance as a parameter - cerebro.addstrategy(strategy_class, strategy_instance=strategy_instance) + # Add the mapped strategy to the backtest, including strategy_instance and precomputed_indicators as parameters + cerebro.addstrategy( + strategy_class, + strategy_instance=strategy_instance, + precomputed_indicators=precomputed_indicators, # Pass precomputed indicators + socketio=self.socketio, # Pass SocketIO instance + socket_conn_id=socket_conn_id, # Pass SocketIO connection ID + data_length=len(data_feed) # Pass data length for progress updates + ) # Add data feed to Cerebro bt_feed = bt.feeds.PandasData(dataname=data_feed) @@ -754,6 +398,7 @@ class Backtester: # Prepare the results to pass into the callback backtest_results = { + "success": True, # Indicate success "initial_capital": initial_cash, "final_portfolio_value": final_value, "run_duration": run_duration, @@ -763,7 +408,10 @@ class Backtester: logger.info("Backtest executed successfully.") - callback(backtest_results) + # Invoke the callback with all necessary parameters + self.backtest_callback(user_name, backtest_name, user_id, strategy_name, + strategy_instance.strategy_instance_id, socket_conn_id, + backtest_key, backtest_results) except Exception as e: # Handle exceptions and send error messages to the client @@ -772,8 +420,16 @@ class Backtester: room=socket_conn_id) logger.error(f"[BACKTEST ERROR] {error_message}", exc_info=True) + # Prepare failure results + failure_results = { + "success": False, + "message": error_message + } + # Invoke callback with failure details to ensure cleanup - callback({"success": False, "message": error_message}) + self.backtest_callback(user_name, backtest_name, user_id, strategy_name, + strategy_instance.strategy_instance_id, socket_conn_id, + backtest_key, failure_results) # Start the backtest as a background task self.socketio.start_background_task(execute_backtest) @@ -781,27 +437,21 @@ class Backtester: def handle_backtest_message(self, user_id: str, msg_data: dict, socket_conn_id: str) -> dict: """ Handle incoming backtest messages, orchestrate the backtest process. - :param user_id: ID of the user initiating the backtest. - :param msg_data: Dictionary containing backtest parameters. - :param socket_conn_id: Socket connection ID for emitting updates. - :return: Dictionary with the status of backtest initiation. """ + # Extract and define backtest parameters user_name = msg_data.get('user_name') - backtest_name = f"{msg_data.get('strategy', 'UnnamedStrategy')}_backtest" - - # Fetch the strategy using user_id and strategy_name strategy_name = msg_data.get('strategy') + backtest_name = f"{strategy_name}_backtest" + strategy_instance_id = f"test_{uuid.uuid4()}" + backtest_key = f"backtest:{user_name}:{backtest_name}" + + # Retrieve the user strategy and validate it. try: - user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name) - except ValueError: - logger.error(f"Invalid user_id '{user_id}'. Must be an integer.") - return {"error": f"Invalid user_id '{user_id}'. Must be an integer."} + user_strategy = self.validate_strategy_components(user_id, strategy_name, user_name) + except ValueError as ve: + return {"error": str(ve)} - if not user_strategy: - logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.") - return {"error": f"Strategy '{strategy_name}' not found for user '{user_name}'."} - - # Prepare the source feeds for the sources referenced in the strategy. + # Prepare the source and indicator feeds referenced in the strategy strategy_components = user_strategy.get('strategy_components', {}) try: data_feed, precomputed_indicators = self.prepare_backtest_data(msg_data, strategy_components) @@ -809,84 +459,57 @@ class Backtester: logger.error(f"Error preparing backtest data: {ve}") return {"error": str(ve)} - # Ensure user_id is an integer - try: - user_id_int = int(user_id) - except ValueError: - logger.error(f"Invalid user_id '{user_id}'. Must be an integer.") - return {"error": f"Invalid user_id '{user_id}'. Must be an integer."} - - # Generate unique strategy_instance_id for the backtest - strategy_instance_id = f"test_{uuid.uuid4()}" - - # Instantiate StrategyInstance with proper indicators and trades - strategy_instance = StrategyInstance( + # Instantiate BacktestStrategyInstance + strategy_instance = BacktestStrategyInstance( strategy_instance_id=strategy_instance_id, strategy_id=user_strategy.get("id"), strategy_name=strategy_name, - user_id=user_id_int, + user_id=int(user_id), generated_code=strategy_components.get("generated_code", ""), data_cache=self.data_cache, - indicators=None, # Indicators are handled via overridden methods - trades=None # Trades are handled via overridden methods + indicators=None, # Custom handling in BacktestStrategyInstance + trades=None # Custom handling in BacktestStrategyInstance ) - # Override any methods that access exchanges and market data with custom handlers for backtesting - strategy_instance = self.add_custom_handlers(strategy_instance) + # Cache the backtest + self.cache_backtest(backtest_key, msg_data, strategy_instance_id) - data_length = len(data_feed) - - mapped_strategy_class = self.map_user_strategy( - user_strategy, - precomputed_indicators, - socketio=self.socketio, - socket_conn_id=socket_conn_id, - data_length=data_length - ) - - # Define the backtest key for caching - backtest_key = f"backtest:{user_name}:{backtest_name}" - - # Cache the backtest initiation in 'tests' cache using the upsert method - self.cache_backtest(user_name, backtest_name, msg_data, strategy_instance_id) - - # Define the callback function to handle backtest completion - def backtest_callback(results): - try: - if results.get("success") is False: - # Handle backtest failure - self.store_backtest_results(user_name, backtest_name, results) - logger.error(f"Backtest '{backtest_name}' failed for user '{user_name}': {results.get('message')}") - else: - # Handle backtest success - self.store_backtest_results(user_name, backtest_name, results) - self.update_strategy_stats(user_id_int, strategy_name, results) - - # Emit the results back to the client - self.socketio.emit('message', - {"reply": 'backtest_results', - "data": {'test_id': backtest_name, "results": results}}, - room=socket_conn_id - ) - logger.info(f"[BACKTEST COMPLETE] Results emitted to user '{user_name}'.") - finally: - # Cleanup regardless of success or failure - self.cleanup_backtest(backtest_key, strategy_instance_id) - - # Run the backtest asynchronously, passing the strategy_instance + # Start the backtest with all required parameters self.run_backtest( - mapped_strategy_class, - data_feed, - msg_data, - user_name, - backtest_callback, - socket_conn_id, - strategy_instance + strategy_class=MappedStrategy, + data_feed=data_feed, + msg_data=msg_data, + user_name=user_name, + socket_conn_id=socket_conn_id, + strategy_instance=strategy_instance, + backtest_name=backtest_name, + user_id=user_id, + backtest_key=backtest_key, + strategy_name=strategy_name, + precomputed_indicators=precomputed_indicators ) logger.info(f"Backtest '{backtest_name}' started for user '{user_name}'.") return {"status": "started", "backtest_name": backtest_name} + # Define the backtest callback + def backtest_callback(self, user_name, backtest_name, user_id, strategy_name, + strategy_instance_id, socket_conn_id,backtest_key, results): + try: + if results.get("success") is False: + self.store_backtest_results(backtest_key, results) + logger.error(f"Backtest '{backtest_name}' failed for user '{user_name}': {results.get('message')}") + else: + self.store_backtest_results(backtest_key, results) + self.update_strategy_stats(int(user_id), strategy_name, results) + self.socketio.emit( + 'message', + {"reply": 'backtest_results', "data": {'test_id': backtest_name, "results": results}}, + room=socket_conn_id, + ) + finally: + self.cleanup_backtest(backtest_key, strategy_instance_id) + def start_periodic_purge(self, interval_seconds: int = 3600): """ Starts a background task that periodically purges expired cache entries and cleans up orphaned backtest contexts. @@ -1039,25 +662,18 @@ class Backtester: else: logger.error(f"Strategy '{strategy_name}' not found for user '{user_id}'.") - def store_backtest_results(self, user_name: str, backtest_name: str, results: dict): + def store_backtest_results(self, backtest_key: str, results: dict): """ Store the backtest results in the cache """ - cache_key = f"backtest:{user_name}:{backtest_name}" - try: - # Use modify_datacache_item to update only the 'results' field self.data_cache.modify_cache_item( cache_name='tests', - filter_vals=[('tbl_key', cache_key)], + filter_vals=[('tbl_key', backtest_key)], field_name='results', new_data=str(results) # Convert dict to string or JSON as needed ) - logger.info(f"Backtest results stored for '{backtest_name}' of user '{user_name}'.") - except ValueError as ve: - logger.error(f"ValueError in storing backtest results for '{backtest_name}' of user '{user_name}': {ve}") - traceback.print_exc() + logger.info(f"Backtest results stored for key '{backtest_key}'.") except Exception as e: - logger.error(f"Error storing backtest results for '{backtest_name}' of user '{user_name}': {e}") - traceback.print_exc() + logger.error(f"Error storing backtest results for '{backtest_key}': {e}", exc_info=True) def calculate_returns(self, equity_curve: list) -> list: """ diff --git a/src/mapped_strategy.py b/src/mapped_strategy.py new file mode 100644 index 0000000..c174403 --- /dev/null +++ b/src/mapped_strategy.py @@ -0,0 +1,142 @@ +# mapped_strategy.py + +import backtrader as bt +import logging +from typing import Dict +import pandas as pd + +from backtest_strategy_instance import BacktestStrategyInstance # Ensure correct import path + +logger = logging.getLogger(__name__) + +class MappedStrategy(bt.Strategy): + """ + A Backtrader Strategy that integrates with a custom StrategyInstance for executing strategy logic. + """ + + params = ( + ('strategy_instance', None), # Instance of BacktestStrategyInstance + ('precomputed_indicators', None), # Dict of precomputed indicators + ('socketio', None), # SocketIO instance for emitting progress + ('socket_conn_id', None), # Socket connection ID for emitting progress + ('data_length', None), # Total number of data points for progress calculation + ) + + def __init__(self): + super().__init__() + + if not self.p.strategy_instance: + raise ValueError("StrategyInstance must be provided to MappedStrategy.") + + self.strategy_instance: BacktestStrategyInstance = self.p.strategy_instance + logger.debug(f"StrategyInstance '{self.strategy_instance.strategy_instance_id}' attached to MappedStrategy.") + + # Establish backreference + self.strategy_instance.backtrader_strategy = self + + self.precomputed_indicators: Dict[str, pd.DataFrame] = self.p.precomputed_indicators or {} + self.indicator_pointers: Dict[str, int] = {name: 0 for name in self.precomputed_indicators.keys()} + self.indicator_names = list(self.precomputed_indicators.keys()) + self.current_step = 0 + + # Initialize lists to store orders and trades + self.orders = [] + self.trade_list = [] + + # Initialize other needed variables + self.starting_balance = self.broker.getvalue() + self.last_progress = 0 # Initialize last_progress + + self.bar_executed = 0 # Initialize bar_executed + def notify_order(self, order): + """ + Handle order notifications from Backtrader. + Delegates to StrategyInstance for custom handling. + """ + if order.status in [order.Submitted, order.Accepted]: + # Order has been submitted/accepted by broker - nothing to do + return + + if order.status in [order.Completed]: + if order.isbuy(): + self.log(f"BUY EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}") + elif order.issell(): + self.log(f"SELL EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}") + self.bar_executed = len(self.datas[0]) + elif order.status in [order.Canceled, order.Margin, order.Rejected]: + self.log('Order Canceled/Margin/Rejected') + + # Remove the order from the list + if order in self.orders: + self.orders.remove(order) + + # Delegate to StrategyInstance if needed + # self.strategy_instance.notify_order(order) + + def notify_trade(self, trade): + """ + Handle trade notifications from Backtrader. + Delegates to StrategyInstance for custom handling. + """ + if not trade.isclosed: + return + + self.log(f"TRADE CLOSED, GROSS P/L: {trade.pnl}, NET P/L: {trade.pnlcomm}") + + # Convert datetime objects to ISO-formatted strings + open_datetime = bt.num2date(trade.dtopen).isoformat() if trade.dtopen else None + close_datetime = bt.num2date(trade.dtclose).isoformat() if trade.dtclose else None + + # Store the trade details for later use + trade_info = { + 'ref': trade.ref, + 'size': trade.size, + 'price': trade.price, + 'pnl': trade.pnl, + 'pnlcomm': trade.pnlcomm, + 'open_datetime': open_datetime, + 'close_datetime': close_datetime + } + self.trade_list.append(trade_info) + + # Delegate to StrategyInstance if needed + # self.strategy_instance.notify_trade(trade) + + def log(self, txt, dt=None): + """Logging function for this strategy""" + dt = dt or self.datas[0].datetime.datetime(0) + logger.info(f"{dt.isoformat()} - {txt}") + # self.strategy_instance.log(txt, dt) + + def next(self): + self.current_step += 1 + + # Execute the strategy + self.execute_strategy() + + # Update progress + if self.p.data_length: + self.update_progress() + + def execute_strategy(self): + try: + execution_result = self.strategy_instance.execute() + if not execution_result.get('success', False): + error_msg = execution_result.get('message', 'Unknown error during strategy execution.') + logger.error(f"Strategy execution failed: {error_msg}") + self.stop() + except Exception as e: + logger.error(f"Error in strategy execution: {e}") + + def update_progress(self): + progress = (self.current_step / self.p.data_length) * 100 + progress = min(int(progress), 100) + + if progress > self.last_progress: + if self.p.socketio and self.p.socket_conn_id: + self.p.socketio.emit( + 'message', + {'reply': 'progress', 'data': {'progress': progress}}, + room=self.p.socket_conn_id + ) + self.last_progress = progress