The test seem to run without errors but are still process blocking.

This commit is contained in:
Rob 2024-11-15 18:08:32 -04:00
parent 4f11778b09
commit 4072a9d5f5
5 changed files with 598 additions and 494 deletions

112
markdown/backtesting.md Normal file
View File

@ -0,0 +1,112 @@
```plantuml
@startuml
!define RECTANGLE class
actor User as U
participant "User Interface" as UI
participant "Backtester Class" as Backtester
participant "DataCache_v3" as DataCache
participant "Strategies" as Strategies
participant "Indicators" as Indicators
participant "StrategyInstance" as StrategyInstance
participant "Backtrader (Cerebro)" as Cerebro
participant "Custom Analyzers" as Analyzers
participant "EquityCurveAnalyzer" as EquityAnalyzer
participant "TradeAnalyzer" as TradeAnalyzer
participant "Pandas DataFeed" as DataFeed
participant "Logging" as Logging
participant "SocketIO" as SocketIO
== Backtest Submission ==
U -> UI: Submit Backtest Request\n(strategy details, parameters)
UI -> Backtester: initiate_backtest(strategy_details, parameters)
activate Backtester
== Caching Backtest ==
Backtester -> DataCache: cache_backtest(user_name, backtest_name, backtest_data, strategy_instance_id)
activate DataCache
DataCache --> Backtester: Confirmation
deactivate DataCache
== Preparing Strategy Instance ==
Backtester -> Strategies: retrieve_strategy(strategy_id)
activate Strategies
Strategies --> Backtester: strategy_class
deactivate Strategies
Backtester -> Indicators: get_precomputed_indicators(strategy_id)
activate Indicators
Indicators --> Backtester: precomputed_indicators
deactivate Indicators
Backtester -> Backtester: map_user_strategy(user_strategy, precomputed_indicators, mode)
Backtester -> StrategyInstance: __init__(strategy_instance_id, strategy_id, strategy_name, user_id, generated_code, data_cache, indicators, trades)
activate StrategyInstance
StrategyInstance --> Backtester: Initialized
deactivate StrategyInstance
== Preparing Data Feed ==
Backtester -> DataCache: prepare_data_feed(data_parameters)
activate DataCache
DataCache --> Backtester: data_feed
deactivate DataCache
Backtester -> Backtester: add_custom_handlers()
Backtester -> Backtester: precompute_indicators(precomputed_indicators)
Backtester -> Backtester: setup_strategy_instance(strategy_instance)
== Running Backtest ==
Backtester -> Cerebro: setup_cerebro(data_feed, strategy_class, parameters)
activate Cerebro
Cerebro -> Cerebro: addstrategy(strategy_class, **kwargs)
Cerebro -> Cerebro: adddata(data_feed)
Cerebro -> Cerebro: setcash(initial_capital)
Cerebro -> Cerebro: setcommission(commission)
Cerebro -> Analyzers: add_analyzer(EquityCurveAnalyzer, _name='equity_curve')
Cerebro -> Analyzers: add_analyzer(TradeAnalyzer, _name='trade_analyzer')
Cerebro -> Logging: configure_logging()
Cerebro -> Cerebro: run()
activate Cerebro
Cerebro -> EquityAnalyzer: initialize()
activate EquityAnalyzer
Cerebro -> TradeAnalyzer: initialize()
activate TradeAnalyzer
Cerebro -> StrategyInstance: attach_backtrader_strategy(strategy)
Cerebro -> StrategyInstance: execute()
activate StrategyInstance
StrategyInstance -> Cerebro: next()
StrategyInstance -> Logging: log("Strategy execution step")
StrategyInstance --> Cerebro: Step completed
Cerebro -> EquityAnalyzer: record_equity_curve()
Cerebro -> TradeAnalyzer: analyze_trades()
Cerebro --> Backtester: backtest_results
deactivate Cerebro
deactivate Analyzers
== Processing Results ==
Backtester -> Backtester: calculate_returns(equity_curve)
Backtester -> Backtester: analyze_trades(trades)
Backtester -> Backtester: compute_statistics(total_return, sharpe_ratio, max_drawdown, win_loss_ratio)
Backtester -> Backtester: update_stats(strategy_id, stats)
Backtester -> DataCache: store_backtest_results(user_name, backtest_name, results)
activate DataCache
DataCache --> Backtester: Confirmation
deactivate DataCache
== Emitting Results ==
Backtester -> SocketIO: emit('backtest_results', data, room=socket_conn_id)
activate SocketIO
SocketIO --> U: Receive Backtest Results
deactivate SocketIO
deactivate Backtester
@enduml

View File

@ -1,6 +1,7 @@
import logging import logging
import pandas as pd import pandas as pd
from sqlalchemy.util import symbol
from DataCache_v3 import DataCache from DataCache_v3 import DataCache
from indicators import Indicators from indicators import Indicators
@ -391,7 +392,6 @@ class StrategyInstance:
self, self,
trade_type: str, trade_type: str,
size: float, size: float,
symbol: str,
order_type: str, order_type: str,
source: dict = None, source: dict = None,
tif: str = 'GTC', tif: str = 'GTC',
@ -406,6 +406,7 @@ class StrategyInstance:
""" """
Unified trade order handler for executing buy and sell orders. Unified trade order handler for executing buy and sell orders.
""" """
symbol = source['symbol']
if trade_type == 'buy': if trade_type == 'buy':
logger.info(f"Executing BUY order: Size={size}, Symbol={symbol}, Order Type={order_type}") logger.info(f"Executing BUY order: Size={size}, Symbol={symbol}, Order Type={order_type}")
# Implement buy order logic here # Implement buy order logic here
@ -475,6 +476,8 @@ class StrategyInstance:
:param output_field: Specific field of the indicator. :param output_field: Specific field of the indicator.
:return: Indicator value. :return: Indicator value.
""" """
logger.debug(f"StrategyInstance is Retrieving indicator '{indicator_name}' from Indicators for user '{self.user_id}'.")
try: try:
user_indicators = self.indicators.get_indicator_list(user_id=self.user_id) user_indicators = self.indicators.get_indicator_list(user_id=self.user_id)
indicator = user_indicators.get(indicator_name) indicator = user_indicators.get(indicator_name)

View File

@ -0,0 +1,231 @@
# backtest_strategy_instance.py
import logging
import pandas as pd
import datetime as dt
import backtrader as bt
from StrategyInstance import StrategyInstance
logger = logging.getLogger(__name__)
class BacktestStrategyInstance(StrategyInstance):
"""
Extends StrategyInstance with custom methods for backtesting.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 1. Override trade_order
def trade_order(
self,
trade_type: str,
size: float,
order_type: str,
source: dict = None,
tif: str = 'GTC',
stop_loss: dict = None,
trailing_stop: dict = None,
take_profit: dict = None,
limit: dict = None,
trailing_limit: dict = None,
target_market: dict = None,
name_order: dict = None
):
"""
Custom trade_order method for backtesting.
Executes trades within the Backtrader environment.
"""
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return
# Validate and extract symbol
symbol = source.get('market') if source and 'market' in source else None
if not symbol:
logger.error("Symbol not provided in source. Order not executed.")
return
# Common logic for BUY and SELL
price = self.backtrader_strategy.data.close[0]
stop_loss_price = stop_loss.get('value') if stop_loss else None
take_profit_price = take_profit.get('value') if take_profit else None
# Determine trade execution type
if trade_type.lower() == 'buy':
bracket_orders = self.backtrader_strategy.buy_bracket(
size=size,
price=price,
stopprice=stop_loss_price,
limitprice=take_profit_price,
exectype=bt.Order.Market
)
action = "BUY"
elif trade_type.lower() == 'sell':
bracket_orders = self.backtrader_strategy.sell_bracket(
size=size,
price=price,
stopprice=stop_loss_price,
limitprice=take_profit_price,
exectype=bt.Order.Market
)
action = "SELL"
else:
logger.error(f"Invalid trade_type '{trade_type}'. Order not executed.")
return
# Store and notify
if bracket_orders:
self.backtrader_strategy.orders.extend(bracket_orders)
message = f"{action} order executed for {size} {symbol} at {order_type} price."
self.notify_user(message)
logger.info(message)
# 2. Override process_indicator
def process_indicator(self, indicator_name: str, output_field: str):
"""
Retrieves precomputed indicator values for backtesting.
"""
logger.debug(f"Backtester is Retrieving indicator '{indicator_name}' from precomputed data.")
logger.debug(f'here is the precomputed_indicators: {self.backtrader_strategy.precomputed_indicators}')
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return None
df = self.backtrader_strategy.precomputed_indicators.get(indicator_name)
if df is None:
logger.error(f"Indicator '{indicator_name}' not found.")
return None
idx = self.backtrader_strategy.indicator_pointers.get(indicator_name, 0)
if idx >= len(df):
logger.warning(f"No more data for indicator '{indicator_name}' at index {idx}.")
return None
value = df.iloc[idx].get(output_field)
if pd.isna(value):
logger.warning(f"NaN value encountered for indicator '{indicator_name}' at index {idx}.")
return None
return value
# 3. Override get_current_price
def get_current_price(self, timeframe: str = '1h', exchange: str = 'binance',
symbol: str = 'BTC/USD') -> float:
"""
Retrieves the current market price from Backtrader's data feed.
"""
if self.backtrader_strategy:
return self.backtrader_strategy.data.close[0]
logger.error("Backtrader strategy is not set.")
return 0.0
# 4. Override get_last_candle
def get_last_candle(self, candle_part: str, timeframe: str, exchange: str, symbol: str):
"""
Retrieves the specified part of the last candle from Backtrader's data feed.
"""
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return None
candle_map = {
'open': self.backtrader_strategy.data.open[0],
'high': self.backtrader_strategy.data.high[0],
'low': self.backtrader_strategy.data.low[0],
'close': self.backtrader_strategy.data.close[0],
'volume': self.backtrader_strategy.data.volume[0],
}
value = candle_map.get(candle_part.lower())
if value is None:
logger.error(f"Invalid candle_part '{candle_part}'. Must be one of {list(candle_map.keys())}.")
else:
logger.debug(
f"Retrieved '{candle_part}' from last candle for {symbol} on {exchange} ({timeframe}): {value}"
)
return value
# 5. Override get_filled_orders
def get_filled_orders(self) -> int:
"""
Retrieves the number of filled orders from Backtrader's broker.
"""
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return 0
try:
filled_orders = len(self.backtrader_strategy.broker.filled)
logger.debug(f"Number of filled orders: {filled_orders}")
return filled_orders
except Exception as e:
logger.error(f"Error retrieving filled orders: {e}", exc_info=True)
return 0
# 6. Override get_available_balance
def get_available_balance(self) -> float:
"""
Retrieves the available balance from Backtrader's broker.
"""
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return 0.0
try:
available_balance = self.backtrader_strategy.broker.getcash()
logger.debug(f"Available balance: {available_balance}")
return available_balance
except Exception as e:
logger.error(f"Error retrieving available balance: {e}", exc_info=True)
return 0.0
# 7. Override get_current_balance
def get_current_balance(self) -> float:
"""
Retrieves the current balance from Backtrader's broker.
"""
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return 0.0
try:
balance = self.backtrader_strategy.broker.getvalue()
logger.debug(f"Current balance retrieved: {balance}.")
return balance
except Exception as e:
logger.error(f"Error retrieving current balance: {e}", exc_info=True)
return 0.0
# 8. Override get_filled_orders_details (Optional but Recommended)
def get_filled_orders_details(self) -> list:
"""
Retrieves detailed information about filled orders.
"""
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return []
try:
filled_orders = []
for order in self.backtrader_strategy.broker.filled:
order_info = {
'ref': order.ref,
'size': order.size,
'price': order.executed.price,
'value': order.executed.value,
'commission': order.executed.comm,
'status': order.status,
'created_at': dt.datetime.fromtimestamp(order.created.dt.timestamp()) if hasattr(order,
'created') else None
}
filled_orders.append(order_info)
logger.debug(f"Filled orders details: {filled_orders}")
return filled_orders
except Exception as e:
logger.error(f"Error retrieving filled orders details: {e}", exc_info=True)
return []
# 9. Override notify_user
def notify_user(self, message: str):
"""
Suppresses user notifications and instead logs them.
:param message: Notification message.
"""
logger.debug(f"Backtest notification: {message}")

View File

@ -1,7 +1,7 @@
# backtesting.py
import logging import logging
import time import time
import traceback
import types
import uuid import uuid
import backtrader as bt import backtrader as bt
@ -10,6 +10,8 @@ from DataCache_v3 import DataCache, RowBasedCache, TableBasedCache
from Strategies import Strategies from Strategies import Strategies
from StrategyInstance import StrategyInstance from StrategyInstance import StrategyInstance
from indicators import Indicators from indicators import Indicators
from backtest_strategy_instance import BacktestStrategyInstance
from mapped_strategy import MappedStrategy
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import signal import signal
@ -22,6 +24,7 @@ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(messag
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
# Custom EquityCurveAnalyzer # Custom EquityCurveAnalyzer
class EquityCurveAnalyzer(bt.Analyzer): class EquityCurveAnalyzer(bt.Analyzer):
def __init__(self): def __init__(self):
@ -66,17 +69,14 @@ class Backtester:
signal.signal(signal.SIGINT, self.shutdown_handler) signal.signal(signal.SIGINT, self.shutdown_handler)
signal.signal(signal.SIGTERM, self.shutdown_handler) signal.signal(signal.SIGTERM, self.shutdown_handler)
def cache_backtest(self, user_name: str, backtest_name: str, backtest_data: dict, strategy_instance_id: str): def cache_backtest(self, backtest_key: str, backtest_data: dict, strategy_instance_id: str):
""" """
Cache the backtest data for a user. Cache the backtest data for a user.
If the backtest already exists, update it; otherwise, insert a new entry. If the backtest already exists, update it; otherwise, insert a new entry.
:param backtest_key: A unique identifier for the backtest.
:param user_name: Name of the user.
:param backtest_name: Name of the backtest.
:param backtest_data: Dictionary containing backtest parameters. :param backtest_data: Dictionary containing backtest parameters.
:param strategy_instance_id: The ID of the strategy instance.
""" """
cache_key = f"backtest:{user_name}:{backtest_name}"
# Define columns and corresponding values (excluding 'tbl_key') # Define columns and corresponding values (excluding 'tbl_key')
columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results', 'strategy_instance_id') columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results', 'strategy_instance_id')
values = ( values = (
@ -93,7 +93,7 @@ class Backtester:
# Check if the backtest already exists # Check if the backtest already exists
existing_backtest = self.data_cache.get_rows_from_cache( existing_backtest = self.data_cache.get_rows_from_cache(
cache_name='tests', cache_name='tests',
filter_vals=[('tbl_key', cache_key)] filter_vals=[('tbl_key', backtest_key)]
) )
if existing_backtest.empty: if existing_backtest.empty:
@ -102,24 +102,23 @@ class Backtester:
cache_name='tests', cache_name='tests',
columns=columns, columns=columns,
values=values, values=values,
key=cache_key key=backtest_key
) )
logger.debug(f"Inserted new backtest entry '{cache_key}'.") logger.debug(f"Inserted new backtest entry '{backtest_key}'.")
else: else:
# Update existing backtest entry (e.g., reset 'results' if needed) # Update existing backtest entry (e.g., reset 'results' if needed)
# Here, we assume you might want to reset 'results' when re-running # Here, we assume you might want to reset 'results' when re-running
self.data_cache.modify_datacache_item( self.data_cache.modify_datacache_item(
cache_name='tests', cache_name='tests',
filter_vals=[('tbl_key', cache_key)], filter_vals=[('tbl_key', backtest_key)],
field_names=('results',), field_names=('results',),
new_values=(None,), # Reset results new_values=(None,), # Reset results
overwrite='tbl_key' # Ensures uniqueness based on 'tbl_key' overwrite='tbl_key' # Ensures uniqueness based on 'tbl_key'
) )
logger.debug(f"Updated existing backtest entry '{cache_key}'. Reset 'results'.") logger.debug(f"Updated existing backtest entry '{backtest_key}'. Reset 'results'.")
except Exception as e: except Exception as e:
logger.error(f"Error in cache_backtest for '{cache_key}': {e}", exc_info=True) logger.error(f"Error in cache_backtest for '{backtest_key}': {e}", exc_info=True)
# Depending on your application, you might want to raise the exception
# raise e
def cleanup_backtest(self, backtest_key: str, strategy_instance_id: str) -> None: def cleanup_backtest(self, backtest_key: str, strategy_instance_id: str) -> None:
""" """
@ -148,16 +147,24 @@ class Backtester:
except Exception as e: except Exception as e:
logger.error(f"Error during cleanup of backtest '{backtest_key}': {e}", exc_info=True) logger.error(f"Error during cleanup of backtest '{backtest_key}': {e}", exc_info=True)
def validate_strategy_components(self, user_id: str, strategy_name: str, user_name: str) -> dict:
"""
Retrieves and validates the components of a user-defined strategy.
Raises a ValueError if required components are missing or incorrectly formatted.
"""
try:
user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name)
except ValueError:
logger.error(f"Invalid user_id '{user_id}'. Must be an integer.")
raise ValueError(f"Invalid user_id '{user_id}'. Must be an integer.")
def map_user_strategy(self, user_strategy: dict, precomputed_indicators: dict[str, pd.DataFrame], if not user_strategy:
mode: str = 'testing', socketio=None, socket_conn_id=None, data_length=None) -> any: logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.")
""" raise ValueError(f"Strategy '{strategy_name}' not found for user '{user_name}'.")
Maps user strategy details into a Backtrader-compatible strategy class.
""" strategy_components = user_strategy.get('strategy_components', {})
# Extract the generated code and indicators from the strategy components generated_code = strategy_components.get('generated_code')
strategy_components = user_strategy['strategy_components'] indicators_used = strategy_components.get('indicators')
generated_code = strategy_components['generated_code']
indicators_used = strategy_components['indicators']
# Validate extracted data # Validate extracted data
if not generated_code: if not generated_code:
@ -168,378 +175,8 @@ class Backtester:
logger.error("'indicators_used' should be a list.") logger.error("'indicators_used' should be a list.")
raise ValueError("'indicators_used' should be a list.") raise ValueError("'indicators_used' should be a list.")
logger.info(f"Mapping strategy '{user_strategy.get('strategy_name', 'Unnamed')}' with mode '{mode}'.") return user_strategy
# Define the strategy class dynamically
class MappedStrategy(bt.Strategy):
params = (
('mode', mode),
('strategy_instance', None), # Will be set during instantiation
('socketio', socketio),
('socket_conn_id', socket_conn_id),
('data_length', data_length)
)
def __init__(self):
super().__init__()
self.strategy_instance: StrategyInstance = self.p.strategy_instance
logger.debug(f"StrategyInstance '{self.strategy_instance.strategy_instance_id}' attached to MappedStrategy.")
# Establish backreference
self.strategy_instance.backtrader_strategy = self
self.precomputed_indicators = precomputed_indicators
self.indicator_pointers = {}
self.indicator_names = list(precomputed_indicators.keys())
self.current_step = 0
# Initialize pointers for each indicator
for name in self.indicator_names:
self.indicator_pointers[name] = 0 # Start at the first row
# Initialize an empty list to store orders
self.orders = []
self.trade_list = []
# Initialize any other needed variables
self.starting_balance = self.broker.getvalue()
self.current_step = 0
self.last_progress = 0 # Initialize last_progress
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
# Order has been submitted/accepted by broker - nothing to do
return
if order.status in [order.Completed]:
if order.isbuy():
self.log(f"BUY EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}")
elif order.issell():
self.log(f"SELL EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}")
self.bar_executed = len(self)
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Order Canceled/Margin/Rejected')
# Remove the order from the list
if order in self.orders:
self.orders.remove(order)
def notify_trade(self, trade):
if not trade.isclosed:
return
self.log(f"TRADE CLOSED, GROSS P/L: {trade.pnl}, NET P/L: {trade.pnlcomm}")
# Convert datetime objects to ISO-formatted strings
open_datetime = bt.num2date(trade.dtopen).isoformat() if trade.dtopen else None
close_datetime = bt.num2date(trade.dtclose).isoformat() if trade.dtclose else None
# Store the trade details for later use
trade_info = {
'ref': trade.ref,
'size': trade.size,
'price': trade.price,
'pnl': trade.pnl,
'pnlcomm': trade.pnlcomm,
'open_datetime': open_datetime,
'close_datetime': close_datetime
}
self.trade_list.append(trade_info)
def log(self, txt, dt=None):
""" Logging function for this strategy"""
dt = dt or self.datas[0].datetime.datetime(0)
logger.info(f"{dt.isoformat()} - {txt}")
def next(self):
self.current_step += 1
# Execute the strategy logic
try:
execution_result = self.strategy_instance.execute()
if not execution_result.get('success', False):
error_msg = execution_result.get('message', 'Unknown error during strategy execution.')
logger.error(f"Strategy execution failed: {error_msg}")
self.stop()
except Exception as e:
logger.error(f"Error in strategy execution: {e}")
# Calculate progress
progress = (self.current_step / self.p.data_length) * 100
progress = min(int(progress), 100) # Ensure progress doesn't exceed 100%
# Emit progress only if it has increased by at least 1%
if progress > self.last_progress:
self.p.socketio.emit(
'message',
{'reply': 'progress', 'data': {'progress': progress}},
room=self.p.socket_conn_id
)
self.last_progress = progress
return MappedStrategy
# Add custom handlers to the StrategyInstance
def add_custom_handlers(self, strategy_instance: StrategyInstance) -> StrategyInstance:
"""
Define custom methods to be injected into exec_context.
:param strategy_instance: The strategy instance to inject the custom handlers into.
:return: The modified strategy instance.
"""
# 1. Override trade_order
def trade_order(
trade_type: str,
size: float,
order_type: str,
source: dict = None,
tif: str = 'GTC',
stop_loss: dict = None,
trailing_stop: dict = None,
take_profit: dict = None,
limit: dict = None,
trailing_limit: dict = None,
target_market: dict = None,
name_order: dict = None
):
"""
Custom trade_order method for backtesting.
Executes trades within the Backtrader environment.
"""
# Validate and extract 'symbol' from 'source'
if source and 'market' in source:
symbol = source['market']
logger.debug(f"Extracted symbol '{symbol}' from source.")
else:
logger.error("Symbol not provided in source. Order not executed.")
return # Abort the order execution
price = strategy_instance.backtrader_strategy.data.close[0]
if trade_type.lower() == 'buy':
logger.info(f"Executing BUY order: Size={size}, Symbol={symbol}, Order Type={order_type}")
# Prepare bracket order parameters
stop_loss_price = stop_loss.get('value') if stop_loss else None
take_profit_price = take_profit.get('value') if take_profit else None
# Create bracket order and store the orders
bracket_orders = strategy_instance.backtrader_strategy.buy_bracket(
size=size,
price=price,
stopprice=stop_loss_price,
limitprice=take_profit_price,
exectype=bt.Order.Market
)
elif trade_type.lower() == 'sell':
logger.info(f"Executing SELL order: Size={size}, Symbol={symbol}, Order Type={order_type}")
# Prepare bracket order parameters
stop_loss_price = stop_loss.get('value') if stop_loss else None
take_profit_price = take_profit.get('value') if take_profit else None
# Create bracket order and store the orders
bracket_orders = strategy_instance.backtrader_strategy.sell_bracket(
size=size,
price=price,
stopprice=stop_loss_price,
limitprice=take_profit_price,
exectype=bt.Order.Market
)
else:
logger.error(f"Invalid trade_type '{trade_type}'. Order not executed.")
return # Abort the order execution
# Store the orders for tracking
strategy_instance.backtrader_strategy.orders.extend(bracket_orders)
# Notify user about the trade execution
strategy_instance.notify_user(
f"{trade_type.capitalize()} order executed for {size} {symbol} at {order_type} price."
)
logger.debug(f"{trade_type.capitalize()} order executed for {size} {symbol} at {order_type} price.")
# Override the trade_order method
strategy_instance.override_exec_context('trade_order', trade_order)
# 2. Override process_indicator
def process_indicator(indicator_name, output_field):
"""
Custom process_indicator method for backtesting.
:param indicator_name: Name of the indicator.
:param output_field: Specific field to retrieve from the indicator.
:return: The value of the specified indicator field at the current step.
"""
# Access precomputed_indicators via backtrader_strategy
if strategy_instance.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return None
df = strategy_instance.backtrader_strategy.precomputed_indicators.get(indicator_name)
if df is None:
logger.error(f"Indicator '{indicator_name}' not found in precomputed indicators.")
return None
# Access indicator_pointers via backtrader_strategy
idx = strategy_instance.backtrader_strategy.indicator_pointers.get(indicator_name, 0)
if idx >= len(df):
logger.warning(f"No more data for indicator '{indicator_name}' at index {idx}.")
return None # No more data
# Get the specific output value
if output_field in df.columns:
value = df.iloc[idx][output_field]
if pd.isna(value):
logger.warning(f"NaN value encountered for indicator '{indicator_name}' at index {idx}.")
return None # Handle NaN values
return value
else:
logger.error(f"Output field '{output_field}' not found in indicator '{indicator_name}'.")
return None # Output field not found
# Override the process_indicator method
strategy_instance.override_exec_context('process_indicator', process_indicator)
# 3. Override get_current_price
def get_current_price(timeframe: str = '1h', exchange: str = 'binance',
symbol: str = 'BTC/USD') -> float | None:
"""
Retrieves the current market price from Backtrader's data feed.
"""
try:
# Access the current close price from Backtrader's data
current_price = strategy_instance.backtrader_strategy.data.close[0]
logger.debug(f"Retrieved current price for {symbol} on {exchange} ({timeframe}): {current_price}")
return current_price
except Exception as e:
logger.error(f"Error retrieving current price for {symbol} on {exchange} ({timeframe}): {e}",
exc_info=True)
return None
# Override the get_current_price method
strategy_instance.override_exec_context('get_current_price', get_current_price)
# 4. Override get_last_candle
def get_last_candle(candle_part: str, timeframe: str, exchange: str, symbol: str):
"""
Retrieves the specified part of the last candle from Backtrader's data feed.
"""
try:
# Map candle_part to Backtrader's data attributes
candle_map = {
'open': strategy_instance.backtrader_strategy.data.open[0],
'high': strategy_instance.backtrader_strategy.data.high[0],
'low': strategy_instance.backtrader_strategy.data.low[0],
'close': strategy_instance.backtrader_strategy.data.close[0],
'volume': strategy_instance.backtrader_strategy.data.volume[0],
}
value = candle_map.get(candle_part.lower())
if value is None:
logger.error(f"Invalid candle_part '{candle_part}'. Must be one of {list(candle_map.keys())}.")
else:
logger.debug(
f"Retrieved '{candle_part}' from last candle for {symbol} on {exchange} ({timeframe}): {value}")
return value
except Exception as e:
logger.error(
f"Error retrieving last candle '{candle_part}' for {symbol} on {exchange} ({timeframe}): {e}",
exc_info=True)
return None
# Override the get_last_candle method
strategy_instance.override_exec_context('get_last_candle', get_last_candle)
# 5. Override get_filled_orders
def get_filled_orders() -> int:
"""
Retrieves the number of filled orders from Backtrader's broker.
"""
try:
# Access Backtrader's broker's filled orders
filled_orders = len(strategy_instance.backtrader_strategy.broker.filled)
logger.debug(f"Number of filled orders: {filled_orders}")
return filled_orders
except Exception as e:
logger.error(f"Error retrieving filled orders: {e}", exc_info=True)
return 0
# Override the get_filled_orders method
strategy_instance.override_exec_context('get_filled_orders', get_filled_orders)
# 6. Override get_available_balance
def get_available_balance() -> float:
"""
Retrieves the available balance from Backtrader's broker.
"""
try:
available_balance = strategy_instance.backtrader_strategy.broker.getcash()
logger.debug(f"Available balance: {available_balance}")
return available_balance
except Exception as e:
logger.error(f"Error retrieving available balance: {e}", exc_info=True)
return 0.0
# Override the get_available_balance method
strategy_instance.override_exec_context('get_available_balance', get_available_balance)
# 7. Override get_current_balance
def get_current_balance() -> float:
"""
Retrieves the current balance from Backtrader's broker.
:return: Current balance.
"""
try:
# Access the total portfolio value from Backtrader's broker
balance = strategy_instance.backtrader_strategy.broker.getvalue()
logger.debug(f"Current balance retrieved: {balance}.")
return balance
except Exception as e:
logger.error(f"Error retrieving current balance: {e}", exc_info=True)
return 0.0
# Override the get_current_balance method
strategy_instance.override_exec_context('get_current_balance', get_current_balance)
# 8. Override get_filled_orders_details (Optional but Recommended)
def get_filled_orders_details() -> list:
"""
Retrieves detailed information about filled orders.
"""
try:
filled_orders = []
for order in strategy_instance.backtrader_strategy.broker.filled:
order_info = {
'ref': order.ref,
'size': order.size,
'price': order.executed.price,
'value': order.executed.value,
'commission': order.executed.comm,
'status': order.status,
'created_at': dt.datetime.fromtimestamp(order.created.dt.timestamp())
}
filled_orders.append(order_info)
logger.debug(f"Filled orders details: {filled_orders}")
return filled_orders
except Exception as e:
logger.error(f"Error retrieving filled orders details: {e}", exc_info=True)
return []
# Override the get_filled_orders_details method
strategy_instance.override_exec_context('get_filled_orders_details', get_filled_orders_details)
def notify_user(self, message: str):
"""
Suppresses user notifications and instead logs them.
:param message: Notification message.
"""
logger.debug(f"User notification during backtest for user ID '{self.user_id}': {message}")
# Bind the overridden method to the instance
strategy_instance.notify_user = types.MethodType(notify_user, strategy_instance)
# Return the modified strategy_instance
return strategy_instance
def prepare_data_feed(self, start_date: str, source, user_name: str) -> pd.DataFrame: def prepare_data_feed(self, start_date: str, source, user_name: str) -> pd.DataFrame:
""" """
@ -689,14 +326,14 @@ class Backtester:
return data_feed, precomputed_indicators return data_feed, precomputed_indicators
def run_backtest(self, strategy_class, data_feed: pd.DataFrame, msg_data: dict, user_name: str, def run_backtest(self, strategy_class, data_feed: pd.DataFrame, msg_data: dict, user_name: str,
callback, socket_conn_id: str, strategy_instance: StrategyInstance): socket_conn_id: str, strategy_instance: BacktestStrategyInstance, backtest_name: str,
user_id: str, backtest_key: str, strategy_name: str, precomputed_indicators: dict):
""" """
Runs a backtest using Backtrader and uses Flask-SocketIO's background tasks. Runs a backtest using Backtrader and uses Flask-SocketIO's background tasks.
Sends progress updates to the client via WebSocket. Sends progress updates to the client via WebSocket.
""" """
def execute_backtest(): def execute_backtest():
nonlocal data_feed
try: try:
# **Convert 'time' to 'datetime' if necessary** # **Convert 'time' to 'datetime' if necessary**
if 'time' in data_feed.columns: if 'time' in data_feed.columns:
@ -715,8 +352,15 @@ class Backtester:
# Assign cerebro to strategy_instance for potential use in custom methods # Assign cerebro to strategy_instance for potential use in custom methods
strategy_instance.cerebro = cerebro strategy_instance.cerebro = cerebro
# Add the mapped strategy to the backtest, including strategy_instance as a parameter # Add the mapped strategy to the backtest, including strategy_instance and precomputed_indicators as parameters
cerebro.addstrategy(strategy_class, strategy_instance=strategy_instance) cerebro.addstrategy(
strategy_class,
strategy_instance=strategy_instance,
precomputed_indicators=precomputed_indicators, # Pass precomputed indicators
socketio=self.socketio, # Pass SocketIO instance
socket_conn_id=socket_conn_id, # Pass SocketIO connection ID
data_length=len(data_feed) # Pass data length for progress updates
)
# Add data feed to Cerebro # Add data feed to Cerebro
bt_feed = bt.feeds.PandasData(dataname=data_feed) bt_feed = bt.feeds.PandasData(dataname=data_feed)
@ -754,6 +398,7 @@ class Backtester:
# Prepare the results to pass into the callback # Prepare the results to pass into the callback
backtest_results = { backtest_results = {
"success": True, # Indicate success
"initial_capital": initial_cash, "initial_capital": initial_cash,
"final_portfolio_value": final_value, "final_portfolio_value": final_value,
"run_duration": run_duration, "run_duration": run_duration,
@ -763,7 +408,10 @@ class Backtester:
logger.info("Backtest executed successfully.") logger.info("Backtest executed successfully.")
callback(backtest_results) # Invoke the callback with all necessary parameters
self.backtest_callback(user_name, backtest_name, user_id, strategy_name,
strategy_instance.strategy_instance_id, socket_conn_id,
backtest_key, backtest_results)
except Exception as e: except Exception as e:
# Handle exceptions and send error messages to the client # Handle exceptions and send error messages to the client
@ -772,8 +420,16 @@ class Backtester:
room=socket_conn_id) room=socket_conn_id)
logger.error(f"[BACKTEST ERROR] {error_message}", exc_info=True) logger.error(f"[BACKTEST ERROR] {error_message}", exc_info=True)
# Prepare failure results
failure_results = {
"success": False,
"message": error_message
}
# Invoke callback with failure details to ensure cleanup # Invoke callback with failure details to ensure cleanup
callback({"success": False, "message": error_message}) self.backtest_callback(user_name, backtest_name, user_id, strategy_name,
strategy_instance.strategy_instance_id, socket_conn_id,
backtest_key, failure_results)
# Start the backtest as a background task # Start the backtest as a background task
self.socketio.start_background_task(execute_backtest) self.socketio.start_background_task(execute_backtest)
@ -781,27 +437,21 @@ class Backtester:
def handle_backtest_message(self, user_id: str, msg_data: dict, socket_conn_id: str) -> dict: def handle_backtest_message(self, user_id: str, msg_data: dict, socket_conn_id: str) -> dict:
""" """
Handle incoming backtest messages, orchestrate the backtest process. Handle incoming backtest messages, orchestrate the backtest process.
:param user_id: ID of the user initiating the backtest.
:param msg_data: Dictionary containing backtest parameters.
:param socket_conn_id: Socket connection ID for emitting updates.
:return: Dictionary with the status of backtest initiation.
""" """
# Extract and define backtest parameters
user_name = msg_data.get('user_name') user_name = msg_data.get('user_name')
backtest_name = f"{msg_data.get('strategy', 'UnnamedStrategy')}_backtest"
# Fetch the strategy using user_id and strategy_name
strategy_name = msg_data.get('strategy') strategy_name = msg_data.get('strategy')
backtest_name = f"{strategy_name}_backtest"
strategy_instance_id = f"test_{uuid.uuid4()}"
backtest_key = f"backtest:{user_name}:{backtest_name}"
# Retrieve the user strategy and validate it.
try: try:
user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name) user_strategy = self.validate_strategy_components(user_id, strategy_name, user_name)
except ValueError: except ValueError as ve:
logger.error(f"Invalid user_id '{user_id}'. Must be an integer.") return {"error": str(ve)}
return {"error": f"Invalid user_id '{user_id}'. Must be an integer."}
if not user_strategy: # Prepare the source and indicator feeds referenced in the strategy
logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.")
return {"error": f"Strategy '{strategy_name}' not found for user '{user_name}'."}
# Prepare the source feeds for the sources referenced in the strategy.
strategy_components = user_strategy.get('strategy_components', {}) strategy_components = user_strategy.get('strategy_components', {})
try: try:
data_feed, precomputed_indicators = self.prepare_backtest_data(msg_data, strategy_components) data_feed, precomputed_indicators = self.prepare_backtest_data(msg_data, strategy_components)
@ -809,84 +459,57 @@ class Backtester:
logger.error(f"Error preparing backtest data: {ve}") logger.error(f"Error preparing backtest data: {ve}")
return {"error": str(ve)} return {"error": str(ve)}
# Ensure user_id is an integer # Instantiate BacktestStrategyInstance
try: strategy_instance = BacktestStrategyInstance(
user_id_int = int(user_id)
except ValueError:
logger.error(f"Invalid user_id '{user_id}'. Must be an integer.")
return {"error": f"Invalid user_id '{user_id}'. Must be an integer."}
# Generate unique strategy_instance_id for the backtest
strategy_instance_id = f"test_{uuid.uuid4()}"
# Instantiate StrategyInstance with proper indicators and trades
strategy_instance = StrategyInstance(
strategy_instance_id=strategy_instance_id, strategy_instance_id=strategy_instance_id,
strategy_id=user_strategy.get("id"), strategy_id=user_strategy.get("id"),
strategy_name=strategy_name, strategy_name=strategy_name,
user_id=user_id_int, user_id=int(user_id),
generated_code=strategy_components.get("generated_code", ""), generated_code=strategy_components.get("generated_code", ""),
data_cache=self.data_cache, data_cache=self.data_cache,
indicators=None, # Indicators are handled via overridden methods indicators=None, # Custom handling in BacktestStrategyInstance
trades=None # Trades are handled via overridden methods trades=None # Custom handling in BacktestStrategyInstance
) )
# Override any methods that access exchanges and market data with custom handlers for backtesting # Cache the backtest
strategy_instance = self.add_custom_handlers(strategy_instance) self.cache_backtest(backtest_key, msg_data, strategy_instance_id)
data_length = len(data_feed) # Start the backtest with all required parameters
mapped_strategy_class = self.map_user_strategy(
user_strategy,
precomputed_indicators,
socketio=self.socketio,
socket_conn_id=socket_conn_id,
data_length=data_length
)
# Define the backtest key for caching
backtest_key = f"backtest:{user_name}:{backtest_name}"
# Cache the backtest initiation in 'tests' cache using the upsert method
self.cache_backtest(user_name, backtest_name, msg_data, strategy_instance_id)
# Define the callback function to handle backtest completion
def backtest_callback(results):
try:
if results.get("success") is False:
# Handle backtest failure
self.store_backtest_results(user_name, backtest_name, results)
logger.error(f"Backtest '{backtest_name}' failed for user '{user_name}': {results.get('message')}")
else:
# Handle backtest success
self.store_backtest_results(user_name, backtest_name, results)
self.update_strategy_stats(user_id_int, strategy_name, results)
# Emit the results back to the client
self.socketio.emit('message',
{"reply": 'backtest_results',
"data": {'test_id': backtest_name, "results": results}},
room=socket_conn_id
)
logger.info(f"[BACKTEST COMPLETE] Results emitted to user '{user_name}'.")
finally:
# Cleanup regardless of success or failure
self.cleanup_backtest(backtest_key, strategy_instance_id)
# Run the backtest asynchronously, passing the strategy_instance
self.run_backtest( self.run_backtest(
mapped_strategy_class, strategy_class=MappedStrategy,
data_feed, data_feed=data_feed,
msg_data, msg_data=msg_data,
user_name, user_name=user_name,
backtest_callback, socket_conn_id=socket_conn_id,
socket_conn_id, strategy_instance=strategy_instance,
strategy_instance backtest_name=backtest_name,
user_id=user_id,
backtest_key=backtest_key,
strategy_name=strategy_name,
precomputed_indicators=precomputed_indicators
) )
logger.info(f"Backtest '{backtest_name}' started for user '{user_name}'.") logger.info(f"Backtest '{backtest_name}' started for user '{user_name}'.")
return {"status": "started", "backtest_name": backtest_name} return {"status": "started", "backtest_name": backtest_name}
# Define the backtest callback
def backtest_callback(self, user_name, backtest_name, user_id, strategy_name,
strategy_instance_id, socket_conn_id,backtest_key, results):
try:
if results.get("success") is False:
self.store_backtest_results(backtest_key, results)
logger.error(f"Backtest '{backtest_name}' failed for user '{user_name}': {results.get('message')}")
else:
self.store_backtest_results(backtest_key, results)
self.update_strategy_stats(int(user_id), strategy_name, results)
self.socketio.emit(
'message',
{"reply": 'backtest_results', "data": {'test_id': backtest_name, "results": results}},
room=socket_conn_id,
)
finally:
self.cleanup_backtest(backtest_key, strategy_instance_id)
def start_periodic_purge(self, interval_seconds: int = 3600): def start_periodic_purge(self, interval_seconds: int = 3600):
""" """
Starts a background task that periodically purges expired cache entries and cleans up orphaned backtest contexts. Starts a background task that periodically purges expired cache entries and cleans up orphaned backtest contexts.
@ -1039,25 +662,18 @@ class Backtester:
else: else:
logger.error(f"Strategy '{strategy_name}' not found for user '{user_id}'.") logger.error(f"Strategy '{strategy_name}' not found for user '{user_id}'.")
def store_backtest_results(self, user_name: str, backtest_name: str, results: dict): def store_backtest_results(self, backtest_key: str, results: dict):
""" Store the backtest results in the cache """ """ Store the backtest results in the cache """
cache_key = f"backtest:{user_name}:{backtest_name}"
try: try:
# Use modify_datacache_item to update only the 'results' field
self.data_cache.modify_cache_item( self.data_cache.modify_cache_item(
cache_name='tests', cache_name='tests',
filter_vals=[('tbl_key', cache_key)], filter_vals=[('tbl_key', backtest_key)],
field_name='results', field_name='results',
new_data=str(results) # Convert dict to string or JSON as needed new_data=str(results) # Convert dict to string or JSON as needed
) )
logger.info(f"Backtest results stored for '{backtest_name}' of user '{user_name}'.") logger.info(f"Backtest results stored for key '{backtest_key}'.")
except ValueError as ve:
logger.error(f"ValueError in storing backtest results for '{backtest_name}' of user '{user_name}': {ve}")
traceback.print_exc()
except Exception as e: except Exception as e:
logger.error(f"Error storing backtest results for '{backtest_name}' of user '{user_name}': {e}") logger.error(f"Error storing backtest results for '{backtest_key}': {e}", exc_info=True)
traceback.print_exc()
def calculate_returns(self, equity_curve: list) -> list: def calculate_returns(self, equity_curve: list) -> list:
""" """

142
src/mapped_strategy.py Normal file
View File

@ -0,0 +1,142 @@
# mapped_strategy.py
import backtrader as bt
import logging
from typing import Dict
import pandas as pd
from backtest_strategy_instance import BacktestStrategyInstance # Ensure correct import path
logger = logging.getLogger(__name__)
class MappedStrategy(bt.Strategy):
"""
A Backtrader Strategy that integrates with a custom StrategyInstance for executing strategy logic.
"""
params = (
('strategy_instance', None), # Instance of BacktestStrategyInstance
('precomputed_indicators', None), # Dict of precomputed indicators
('socketio', None), # SocketIO instance for emitting progress
('socket_conn_id', None), # Socket connection ID for emitting progress
('data_length', None), # Total number of data points for progress calculation
)
def __init__(self):
super().__init__()
if not self.p.strategy_instance:
raise ValueError("StrategyInstance must be provided to MappedStrategy.")
self.strategy_instance: BacktestStrategyInstance = self.p.strategy_instance
logger.debug(f"StrategyInstance '{self.strategy_instance.strategy_instance_id}' attached to MappedStrategy.")
# Establish backreference
self.strategy_instance.backtrader_strategy = self
self.precomputed_indicators: Dict[str, pd.DataFrame] = self.p.precomputed_indicators or {}
self.indicator_pointers: Dict[str, int] = {name: 0 for name in self.precomputed_indicators.keys()}
self.indicator_names = list(self.precomputed_indicators.keys())
self.current_step = 0
# Initialize lists to store orders and trades
self.orders = []
self.trade_list = []
# Initialize other needed variables
self.starting_balance = self.broker.getvalue()
self.last_progress = 0 # Initialize last_progress
self.bar_executed = 0 # Initialize bar_executed
def notify_order(self, order):
"""
Handle order notifications from Backtrader.
Delegates to StrategyInstance for custom handling.
"""
if order.status in [order.Submitted, order.Accepted]:
# Order has been submitted/accepted by broker - nothing to do
return
if order.status in [order.Completed]:
if order.isbuy():
self.log(f"BUY EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}")
elif order.issell():
self.log(f"SELL EXECUTED, Price: {order.executed.price}, Size: {order.executed.size}")
self.bar_executed = len(self.datas[0])
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Order Canceled/Margin/Rejected')
# Remove the order from the list
if order in self.orders:
self.orders.remove(order)
# Delegate to StrategyInstance if needed
# self.strategy_instance.notify_order(order)
def notify_trade(self, trade):
"""
Handle trade notifications from Backtrader.
Delegates to StrategyInstance for custom handling.
"""
if not trade.isclosed:
return
self.log(f"TRADE CLOSED, GROSS P/L: {trade.pnl}, NET P/L: {trade.pnlcomm}")
# Convert datetime objects to ISO-formatted strings
open_datetime = bt.num2date(trade.dtopen).isoformat() if trade.dtopen else None
close_datetime = bt.num2date(trade.dtclose).isoformat() if trade.dtclose else None
# Store the trade details for later use
trade_info = {
'ref': trade.ref,
'size': trade.size,
'price': trade.price,
'pnl': trade.pnl,
'pnlcomm': trade.pnlcomm,
'open_datetime': open_datetime,
'close_datetime': close_datetime
}
self.trade_list.append(trade_info)
# Delegate to StrategyInstance if needed
# self.strategy_instance.notify_trade(trade)
def log(self, txt, dt=None):
"""Logging function for this strategy"""
dt = dt or self.datas[0].datetime.datetime(0)
logger.info(f"{dt.isoformat()} - {txt}")
# self.strategy_instance.log(txt, dt)
def next(self):
self.current_step += 1
# Execute the strategy
self.execute_strategy()
# Update progress
if self.p.data_length:
self.update_progress()
def execute_strategy(self):
try:
execution_result = self.strategy_instance.execute()
if not execution_result.get('success', False):
error_msg = execution_result.get('message', 'Unknown error during strategy execution.')
logger.error(f"Strategy execution failed: {error_msg}")
self.stop()
except Exception as e:
logger.error(f"Error in strategy execution: {e}")
def update_progress(self):
progress = (self.current_step / self.p.data_length) * 100
progress = min(int(progress), 100)
if progress > self.last_progress:
if self.p.socketio and self.p.socket_conn_id:
self.p.socketio.emit(
'message',
{'reply': 'progress', 'data': {'progress': progress}},
room=self.p.socket_conn_id
)
self.last_progress = progress