The test are running without errors

This commit is contained in:
Rob 2024-11-12 09:43:18 -04:00
parent 33298b7178
commit 1af55f10a0
3 changed files with 395 additions and 102 deletions

View File

@ -568,6 +568,36 @@ class CacheManager:
# Call the cache system to remove the filtered rows # Call the cache system to remove the filtered rows
cache.remove_item(filter_vals) cache.remove_item(filter_vals)
# In CacheManager class
def remove_entries_with_prefix(self, cache_name: str, prefix: str) -> int:
"""
Removes all cache entries in the specified cache that start with the given prefix.
:param cache_name: Name of the cache.
:param prefix: Prefix string to match cache keys.
:return: Number of entries removed.
"""
cache = self.get_cache(cache_name)
removed_count = 0
if isinstance(cache, RowBasedCache):
keys_to_remove = [key for key in cache.cache.keys() if key.startswith(prefix)]
for key in keys_to_remove:
success = cache.remove_item([('tbl_key', key)])
if success:
removed_count += 1
elif isinstance(cache, TableBasedCache):
# Implement similar logic if needed for TableBasedCache
# For example, if 'strategy_instance_id' starts with the prefix
mask = self.caches[cache_name].cache['strategy_instance_id'].astype(str).str.startswith(prefix)
to_remove = self.caches[cache_name].cache[mask]
for _, row in to_remove.iterrows():
filter_vals = [('strategy_instance_id', row['strategy_instance_id'])]
success = cache.remove_item(filter_vals)
if success:
removed_count += 1
return removed_count
def modify_cache_item(self, cache_name: str, filter_vals: List[Tuple[str, any]], field_name: str, def modify_cache_item(self, cache_name: str, filter_vals: List[Tuple[str, any]], field_name: str,
new_data: any) -> None: new_data: any) -> None:
""" """

View File

@ -1,4 +1,7 @@
import logging import logging
import pandas as pd
from DataCache_v3 import DataCache from DataCache_v3 import DataCache
from indicators import Indicators from indicators import Indicators
from trade import Trades from trade import Trades
@ -77,25 +80,31 @@ class StrategyInstance:
# Automatically load or initialize the context # Automatically load or initialize the context
self._initialize_or_load_context() self._initialize_or_load_context()
def _initialize_or_load_context(self): def _initialize_or_load_context(self):
""" """
Checks if a context exists for the strategy instance. If it does, load it; Checks if a context exists for the strategy instance. If it does, load it;
otherwise, initialize a new context. otherwise, initialize a new context.
""" """
if self.data_cache.get_rows_from_datacache( try:
# Fetch the context data once
context_data = self.data_cache.get_rows_from_datacache(
cache_name='strategy_contexts', cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', self.strategy_instance_id)] filter_vals=[('strategy_instance_id', self.strategy_instance_id)]
).empty: )
if context_data.empty:
self.initialize_new_context() self.initialize_new_context()
logger.debug(f"Initialized new context for StrategyInstance '{self.strategy_instance_id}'.") logger.debug(f"Initialized new context for StrategyInstance '{self.strategy_instance_id}'.")
else: else:
self.load_context() self.load_context(context_data)
logger.debug(f"Loaded existing context for StrategyInstance '{self.strategy_instance_id}'.") logger.debug(f"Loaded existing context for StrategyInstance '{self.strategy_instance_id}'.")
except Exception as e:
logger.error(f"Error during initialization of StrategyInstance '{self.strategy_instance_id}': {e}", exc_info=True)
traceback.print_exc()
def initialize_new_context(self): def initialize_new_context(self):
""" """
Initializes a new context for the strategy instance. Initializes a new context for the strategy instance and saves it to the cache.
""" """
self.flags = {} self.flags = {}
self.variables = {} self.variables = {}
@ -104,35 +113,27 @@ class StrategyInstance:
self.paused = False self.paused = False
self.exit = False self.exit = False
self.exit_method = 'all' self.exit_method = 'all'
self.start_time = dt.datetime.now() self.start_time = dt.datetime.now(dt.timezone.utc)
# Insert initial context into the cache # Insert initial context into the cache
self.save_context() self.insert_context()
logger.debug(f"New context created and saved for StrategyInstance '{self.strategy_instance_id}'.") logger.debug(f"New context created and inserted for StrategyInstance '{self.strategy_instance_id}'.")
def load_context(self): def load_context(self, context_data: pd.DataFrame):
""" """
Loads the strategy execution context from the database. Loads the strategy execution context from the provided context_data.
""" """
try: try:
context_data = self.data_cache.get_rows_from_datacache(
cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', self.strategy_instance_id)]
)
if context_data.empty:
logger.warning(f"No context found for StrategyInstance ID: {self.strategy_instance_id}")
return
context = context_data.iloc[0].to_dict() context = context_data.iloc[0].to_dict()
self.flags = json.loads(context.get('flags', '{}')) self.flags = json.loads(context.get('flags', '{}'))
self.profit_loss = context.get('profit_loss', 0.0) self.profit_loss = context.get('profit_loss', 0.0)
self.active = bool(context.get('active', True)) self.active = bool(context.get('active', 1))
self.paused = bool(context.get('paused', False)) self.paused = bool(context.get('paused', 0))
self.exit = bool(context.get('exit', False)) self.exit = bool(context.get('exit', 0))
self.exit_method = context.get('exit_method', 'all') self.exit_method = context.get('exit_method', 'all')
start_time_str = context.get('start_time') start_time_str = context.get('start_time')
if start_time_str: if start_time_str:
self.start_time = dt.datetime.fromisoformat(start_time_str) self.start_time = dt.datetime.fromisoformat(start_time_str).replace(tzinfo=dt.timezone.utc)
# Update exec_context with loaded flags and variables # Update exec_context with loaded flags and variables
self.exec_context['flags'] = self.flags self.exec_context['flags'] = self.flags
@ -145,41 +146,19 @@ class StrategyInstance:
logger.debug(f"Context loaded for StrategyInstance '{self.strategy_instance_id}'.") logger.debug(f"Context loaded for StrategyInstance '{self.strategy_instance_id}'.")
except Exception as e: except Exception as e:
logger.error(f"Error loading context for StrategyInstance '{self.strategy_instance_id}': {e}", logger.error(f"Error loading context for StrategyInstance '{self.strategy_instance_id}': {e}", exc_info=True)
exc_info=True)
traceback.print_exc() traceback.print_exc()
def save_context(self): def insert_context(self):
""" """
Saves the current strategy execution context to the database. Inserts a new context into the cache and database.
Inserts a new row if it doesn't exist; otherwise, updates the existing row.
""" """
try: try:
self.data_cache.modify_datacache_item( columns = (
cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', self.strategy_instance_id)],
field_names=('flags', 'profit_loss', 'active', 'paused', 'exit', 'exit_method', 'start_time'),
new_values=(
json.dumps(self.flags),
self.profit_loss,
int(self.active),
int(self.paused),
int(self.exit),
self.exit_method,
self.start_time.isoformat()
)
)
logger.debug(f"Context saved for StrategyInstance '{self.strategy_instance_id}'.")
except ValueError as ve:
# If the record does not exist, insert it
logger.warning(f"StrategyInstance '{self.strategy_instance_id}' context not found. Attempting to insert.")
self.data_cache.insert_row_into_datacache(
cache_name='strategy_contexts',
columns=(
"strategy_instance_id", "flags", "profit_loss", "strategy_instance_id", "flags", "profit_loss",
"active", "paused", "exit", "exit_method", "start_time" "active", "paused", "exit", "exit_method", "start_time"
), )
values=( values = (
self.strategy_instance_id, self.strategy_instance_id,
json.dumps(self.flags), json.dumps(self.flags),
self.profit_loss, self.profit_loss,
@ -189,10 +168,69 @@ class StrategyInstance:
self.exit_method, self.exit_method,
self.start_time.isoformat() self.start_time.isoformat()
) )
# Insert the new context without passing 'key' to avoid adding 'tbl_key'
self.data_cache.insert_row_into_datacache(
cache_name='strategy_contexts',
columns=columns,
values=values
# key=None is implicit
) )
logger.debug(f"Inserted new context for StrategyInstance '{self.strategy_instance_id}'.") logger.debug(f"Context inserted for StrategyInstance '{self.strategy_instance_id}'.")
except ValueError as ve:
logger.error(f"ValueError in inserting context for '{self.strategy_instance_id}': {ve}")
traceback.print_exc()
except Exception as e: except Exception as e:
logger.error(f"Error saving context for StrategyInstance '{self.strategy_instance_id}': {e}") logger.error(f"Error inserting context for '{self.strategy_instance_id}': {e}")
traceback.print_exc()
def save_context(self):
"""
Saves the current strategy execution context to the cache and database.
Determines whether to insert a new row or modify an existing one.
"""
try:
# Check if the context exists
existing_context = self.data_cache.get_rows_from_datacache(
cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', self.strategy_instance_id)]
)
columns = (
"strategy_instance_id", "flags", "profit_loss",
"active", "paused", "exit", "exit_method", "start_time"
)
values = (
self.strategy_instance_id,
json.dumps(self.flags),
self.profit_loss,
int(self.active),
int(self.paused),
int(self.exit),
self.exit_method,
self.start_time.isoformat()
)
if existing_context.empty:
# Insert a new context since it doesn't exist
self.insert_context()
logger.debug(f"Inserted new context for StrategyInstance '{self.strategy_instance_id}'.")
else:
# Modify the existing context without passing 'key'
self.data_cache.modify_datacache_item(
cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', self.strategy_instance_id)],
field_names=columns,
new_values=values,
overwrite='strategy_instance_id' # Ensures uniqueness
# Do not pass 'key'
)
logger.debug(f"Modified existing context for StrategyInstance '{self.strategy_instance_id}'.")
except ValueError as ve:
logger.error(f"ValueError in saving context for '{self.strategy_instance_id}': {ve}")
traceback.print_exc()
except Exception as e:
logger.error(f"Error saving context for '{self.strategy_instance_id}': {e}")
traceback.print_exc() traceback.print_exc()
def override_exec_context(self, key: str, value: Any): def override_exec_context(self, key: str, value: Any):

View File

@ -1,14 +1,18 @@
import logging import logging
import time
import traceback
import types import types
import uuid import uuid
import backtrader as bt import backtrader as bt
import datetime as dt import datetime as dt
from DataCache_v3 import DataCache from DataCache_v3 import DataCache, RowBasedCache, TableBasedCache
from Strategies import Strategies from Strategies import Strategies
from StrategyInstance import StrategyInstance from StrategyInstance import StrategyInstance
from indicators import Indicators from indicators import Indicators
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import signal
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -37,24 +41,114 @@ class Backtester:
self.strategies = strategies self.strategies = strategies
self.indicators_manager = indicators self.indicators_manager = indicators
self.socketio = socketio self.socketio = socketio
# Create a cache for storing back-tests
self.data_cache.create_cache('tests', cache_type='row', size_limit=100,
default_expiration=dt.timedelta(days=1),
eviction_policy='evict')
def cache_backtest(self, user_name, backtest_name, backtest_data): # Ensure 'tests' cache exists
""" Cache the backtest data for a user """ self.data_cache.create_cache(
columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results') 'tests',
cache_type='row',
size_limit=100,
default_expiration=dt.timedelta(hours=2), # Set to 2 hours
eviction_policy='evict',
columns=[
'user_name', # TEXT
'strategy_name', # TEXT
'start_time', # TEXT or DATETIME
'capital', # REAL or INTEGER
'commission', # REAL
'results', # TEXT or JSON
'strategy_instance_id' # TEXT
]
)
# Start periodic purge (optional)
self.start_periodic_purge(interval_seconds=3600) # Purge every hour
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self.shutdown_handler)
signal.signal(signal.SIGTERM, self.shutdown_handler)
def cache_backtest(self, user_name: str, backtest_name: str, backtest_data: dict, strategy_instance_id: str):
"""
Cache the backtest data for a user.
If the backtest already exists, update it; otherwise, insert a new entry.
:param user_name: Name of the user.
:param backtest_name: Name of the backtest.
:param backtest_data: Dictionary containing backtest parameters.
"""
cache_key = f"backtest:{user_name}:{backtest_name}"
# Define columns and corresponding values (excluding 'tbl_key')
columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results', 'strategy_instance_id')
values = ( values = (
backtest_data.get('user_name'), backtest_data.get('user_name'),
backtest_data.get('strategy'), backtest_data.get('strategy'),
backtest_data.get('start_date'), backtest_data.get('start_date'),
backtest_data.get('capital', 10000), # Default capital if not provided backtest_data.get('capital', 10000), # Default capital if not provided
backtest_data.get('commission', 0.001), # Default commission backtest_data.get('commission', 0.001), # Default commission
None # No results yet; will be filled in after backtest completion None, # No results yet; will be filled in after backtest completion
strategy_instance_id
) )
cache_key = f"backtest:{user_name}:{backtest_name}"
self.data_cache.insert_row_into_cache('tests', columns, values, key=cache_key) try:
# Check if the backtest already exists
existing_backtest = self.data_cache.get_rows_from_cache(
cache_name='tests',
filter_vals=[('tbl_key', cache_key)]
)
if existing_backtest.empty:
# Insert new backtest entry
self.data_cache.insert_row_into_cache(
cache_name='tests',
columns=columns,
values=values,
key=cache_key
)
logger.debug(f"Inserted new backtest entry '{cache_key}'.")
else:
# Update existing backtest entry (e.g., reset 'results' if needed)
# Here, we assume you might want to reset 'results' when re-running
self.data_cache.modify_datacache_item(
cache_name='tests',
filter_vals=[('tbl_key', cache_key)],
field_names=('results',),
new_values=(None,), # Reset results
overwrite='tbl_key' # Ensures uniqueness based on 'tbl_key'
)
logger.debug(f"Updated existing backtest entry '{cache_key}'. Reset 'results'.")
except Exception as e:
logger.error(f"Error in cache_backtest for '{cache_key}': {e}", exc_info=True)
# Depending on your application, you might want to raise the exception
# raise e
def cleanup_backtest(self, backtest_key: str, strategy_instance_id: str) -> None:
"""
Removes cache entries related to a specific backtest and its strategy context.
:param backtest_key: The unique key identifying the backtest in the 'tests' cache.
:param strategy_instance_id: The unique identifier for the strategy instance in 'strategy_contexts'.
"""
try:
# Remove backtest entry from 'tests' cache
self.data_cache.remove_row_from_datacache(
cache_name='tests',
filter_vals=[('tbl_key', backtest_key)],
remove_from_db=False # Assuming tests are transient and not stored in DB
)
logger.info(f"Backtest '{backtest_key}' removed from 'tests' cache.")
# Remove strategy context from 'strategy_contexts' cache
# Assuming 'strategy_contexts' is a table-based cache
self.data_cache.remove_row_from_datacache(
cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', strategy_instance_id)],
remove_from_db=True # Remove from DB as well
)
logger.info(f"Strategy context '{strategy_instance_id}' removed from 'strategy_contexts' cache.")
except Exception as e:
logger.error(f"Error during cleanup of backtest '{backtest_key}': {e}", exc_info=True)
def map_user_strategy(self, user_strategy: dict, precomputed_indicators: dict[str, pd.DataFrame], def map_user_strategy(self, user_strategy: dict, precomputed_indicators: dict[str, pd.DataFrame],
mode: str = 'testing') -> any: mode: str = 'testing') -> any:
@ -416,6 +510,13 @@ class Backtester:
using lists, while providing a clearer structure when dictionaries are preferred. using lists, while providing a clearer structure when dictionaries are preferred.
""" """
try: try:
# Check if start_date is present and non-empty
if not start_date:
# Set default start_date to one hour ago in UTC
one_hour_ago = dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=1)
start_date = one_hour_ago.strftime('%Y-%m-%dT%H:%M')
logger.warning(f"No 'start_date' provided. Setting default start_date to '{start_date}'.")
# Convert the start date to a datetime object and make it timezone-aware (UTC) # Convert the start date to a datetime object and make it timezone-aware (UTC)
start_dt = dt.datetime.strptime(start_date, '%Y-%m-%dT%H:%M') start_dt = dt.datetime.strptime(start_date, '%Y-%m-%dT%H:%M')
start_dt = start_dt.replace(tzinfo=dt.timezone.utc) # Set UTC timezone start_dt = start_dt.replace(tzinfo=dt.timezone.utc) # Set UTC timezone
@ -453,6 +554,7 @@ class Backtester:
def precompute_indicators(self, indicators_definitions: list, user_name: str, data_feed: pd.DataFrame) -> dict: def precompute_indicators(self, indicators_definitions: list, user_name: str, data_feed: pd.DataFrame) -> dict:
""" """
Precompute indicator values and return a dictionary of DataFrames. Precompute indicator values and return a dictionary of DataFrames.
:param user_name: The username associated with the source of the data feed.
:param indicators_definitions: List of indicator definitions. :param indicators_definitions: List of indicator definitions.
:param data_feed: Pandas DataFrame with OHLC data. :param data_feed: Pandas DataFrame with OHLC data.
:return: Dictionary mapping indicator names to their precomputed DataFrames. :return: Dictionary mapping indicator names to their precomputed DataFrames.
@ -551,7 +653,7 @@ class Backtester:
try: try:
# **Convert 'time' to 'datetime' if necessary** # **Convert 'time' to 'datetime' if necessary**
if 'time' in data_feed.columns: if 'time' in data_feed.columns:
data_feed['datetime'] = pd.to_datetime(data_feed['time'], unit='ms') # Adjust 'unit' if needed data_feed['datetime'] = pd.to_datetime(data_feed['time'], unit='ms')
data_feed.set_index('datetime', inplace=True) data_feed.set_index('datetime', inplace=True)
logger.info("Converted 'time' to 'datetime' and set as index in data_feed.") logger.info("Converted 'time' to 'datetime' and set as index in data_feed.")
@ -561,8 +663,6 @@ class Backtester:
logger.error("Data feed is missing one or more required columns: %s", columns_to_keep) logger.error("Data feed is missing one or more required columns: %s", columns_to_keep)
raise ValueError("Incomplete data feed for Backtrader.") raise ValueError("Incomplete data feed for Backtrader.")
data_feed = data_feed[columns_to_keep]
cerebro = bt.Cerebro() cerebro = bt.Cerebro()
# Assign cerebro to strategy_instance for potential use in custom methods # Assign cerebro to strategy_instance for potential use in custom methods
@ -571,7 +671,7 @@ class Backtester:
# Add the mapped strategy to the backtest, including strategy_instance as a parameter # Add the mapped strategy to the backtest, including strategy_instance as a parameter
cerebro.addstrategy(strategy_class, strategy_instance=strategy_instance) cerebro.addstrategy(strategy_class, strategy_instance=strategy_instance)
# Add the main data feed to Cerebro # Add data feed to Cerebro
bt_feed = bt.feeds.PandasData(dataname=data_feed) bt_feed = bt.feeds.PandasData(dataname=data_feed)
cerebro.adddata(bt_feed) cerebro.adddata(bt_feed)
@ -622,7 +722,10 @@ class Backtester:
# Handle exceptions and send error messages to the client # Handle exceptions and send error messages to the client
error_message = f"Backtest execution failed: {str(e)}" error_message = f"Backtest execution failed: {str(e)}"
self.socketio.emit('backtest_error', {"message": error_message}, room=socket_conn_id) self.socketio.emit('backtest_error', {"message": error_message}, room=socket_conn_id)
logger.error(f"[BACKTEST ERROR] {error_message}") logger.error(f"[BACKTEST ERROR] {error_message}", exc_info=True)
# Invoke callback with failure details to ensure cleanup
callback({"success": False, "message": error_message})
# Start the backtest as a background task # Start the backtest as a background task
self.socketio.start_background_task(execute_backtest) self.socketio.start_background_task(execute_backtest)
@ -638,12 +741,13 @@ class Backtester:
user_name = msg_data.get('user_name') user_name = msg_data.get('user_name')
backtest_name = f"{msg_data.get('strategy', 'UnnamedStrategy')}_backtest" backtest_name = f"{msg_data.get('strategy', 'UnnamedStrategy')}_backtest"
# Cache the backtest data
self.cache_backtest(user_name, backtest_name, msg_data)
# Fetch the strategy using user_id and strategy_name # Fetch the strategy using user_id and strategy_name
strategy_name = msg_data.get('strategy') strategy_name = msg_data.get('strategy')
try:
user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name) user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name)
except ValueError:
logger.error(f"Invalid user_id '{user_id}'. Must be an integer.")
return {"error": f"Invalid user_id '{user_id}'. Must be an integer."}
if not user_strategy: if not user_strategy:
logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.") logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.")
@ -665,7 +769,7 @@ class Backtester:
return {"error": f"Invalid user_id '{user_id}'. Must be an integer."} return {"error": f"Invalid user_id '{user_id}'. Must be an integer."}
# Generate unique strategy_instance_id for the backtest # Generate unique strategy_instance_id for the backtest
strategy_instance_id = f"test_{user_id}_{strategy_name}_{dt.datetime.now().isoformat()}" strategy_instance_id = f"test_{uuid.uuid4()}"
# Instantiate StrategyInstance with proper indicators and trades # Instantiate StrategyInstance with proper indicators and trades
strategy_instance = StrategyInstance( strategy_instance = StrategyInstance(
@ -685,14 +789,34 @@ class Backtester:
# Map the user strategy to a Backtrader-compatible strategy class # Map the user strategy to a Backtrader-compatible strategy class
mapped_strategy_class = self.map_user_strategy(user_strategy, precomputed_indicators) mapped_strategy_class = self.map_user_strategy(user_strategy, precomputed_indicators)
# Define the backtest key for caching
backtest_key = f"backtest:{user_name}:{backtest_name}"
# Cache the backtest initiation in 'tests' cache using the upsert method
self.cache_backtest(user_name, backtest_name, msg_data, strategy_instance_id)
# Define the callback function to handle backtest completion # Define the callback function to handle backtest completion
def backtest_callback(results): def backtest_callback(results):
try:
if results.get("success") is False:
# Handle backtest failure
self.store_backtest_results(user_name, backtest_name, results)
logger.error(f"Backtest '{backtest_name}' failed for user '{user_name}': {results.get('message')}")
else:
# Handle backtest success
self.store_backtest_results(user_name, backtest_name, results) self.store_backtest_results(user_name, backtest_name, results)
self.update_strategy_stats(user_id_int, strategy_name, results) self.update_strategy_stats(user_id_int, strategy_name, results)
# Emit the results back to the client # Emit the results back to the client
self.socketio.emit('backtest_results', {"test_id": backtest_name, "results": results}, room=socket_conn_id) self.socketio.emit(
'backtest_results',
{"test_id": backtest_name, "results": results},
room=socket_conn_id
)
logger.info(f"[BACKTEST COMPLETE] Results emitted to user '{user_name}'.") logger.info(f"[BACKTEST COMPLETE] Results emitted to user '{user_name}'.")
finally:
# Cleanup regardless of success or failure
self.cleanup_backtest(backtest_key, strategy_instance_id)
# Run the backtest asynchronously, passing the strategy_instance # Run the backtest asynchronously, passing the strategy_instance
self.run_backtest( self.run_backtest(
@ -708,6 +832,101 @@ class Backtester:
logger.info(f"Backtest '{backtest_name}' started for user '{user_name}'.") logger.info(f"Backtest '{backtest_name}' started for user '{user_name}'.")
return {"reply": "backtest_started"} return {"reply": "backtest_started"}
def start_periodic_purge(self, interval_seconds: int = 3600):
"""
Starts a background task that periodically purges expired cache entries and cleans up orphaned backtest contexts.
:param interval_seconds: Time interval between purges in seconds.
"""
def purge_task():
while True:
time.sleep(interval_seconds)
# Purge expired entries from all caches
for cache_name, cache in self.data_cache.caches.items():
if isinstance(cache, RowBasedCache) or isinstance(cache, TableBasedCache):
cache._purge_expired()
logger.debug(f"Purged expired entries from cache '{cache_name}'.")
# Additionally, clean up orphaned backtest contexts
self.cleanup_orphaned_backtest_contexts()
self.socketio.start_background_task(purge_task)
def cleanup_orphaned_backtest_contexts(self) -> None:
"""
Identifies and removes orphaned backtest contexts that do not have corresponding entries in 'tests' cache.
"""
try:
# Fetch all strategy_instance_ids from 'strategy_contexts' that start with 'test_'
strategy_contexts_df = self.data_cache.get_rows_from_datacache(
cache_name='strategy_contexts',
filter_vals=[] # Fetch all
)
if strategy_contexts_df.empty:
logger.debug("No strategy contexts found for cleanup.")
return
# Filter contexts that are backtests (strategy_instance_id starts with 'test_')
backtest_contexts = strategy_contexts_df[
strategy_contexts_df['strategy_instance_id'].astype(str).str.startswith('test_')
]
if backtest_contexts.empty:
logger.debug("No backtest contexts found for cleanup.")
return
for _, row in backtest_contexts.iterrows():
strategy_instance_id = row['strategy_instance_id']
# Check if this backtest exists in 'tests' cache
# Since 'tests' cache uses 'backtest_key' as 'tbl_key', and it maps to 'strategy_instance_id'
# We'll need to search 'tests' cache for the corresponding 'strategy_instance_id'
found = False
tests_cache = self.data_cache.get_cache('tests')
if isinstance(tests_cache, RowBasedCache):
for key, entry in tests_cache.cache.items():
if entry.data.get('strategy_instance_id') == strategy_instance_id:
found = True
break
if not found:
# Orphaned context found; proceed to remove it
self.data_cache.remove_row_from_datacache(
cache_name='strategy_contexts',
filter_vals=[('strategy_instance_id', strategy_instance_id)],
remove_from_db=True
)
logger.info(f"Orphaned backtest context '{strategy_instance_id}' removed from 'strategy_contexts' cache.")
except Exception as e:
logger.error(f"Error during cleanup of orphaned backtest contexts: {e}", exc_info=True)
def shutdown_handler(self, signum, frame):
"""
Handles server shutdown signals to perform cleanup.
"""
logger.info(f"Received shutdown signal: {signum}. Cleaning up ongoing backtests.")
try:
# Fetch all backtest entries from 'tests' cache
tests_cache = self.data_cache.get_cache('tests')
if isinstance(tests_cache, RowBasedCache):
active_tests = list(tests_cache.cache.keys()) # Get all backtest keys
for backtest_key in active_tests:
# Fetch strategy_instance_id for each backtest
strategy_instance_id = self.data_cache.get_cache_item(
item_name='strategy_instance_id',
cache_name='tests',
filter_vals=('tbl_key', backtest_key)
)
if strategy_instance_id:
self.cleanup_backtest(backtest_key, strategy_instance_id)
logger.info("Cleanup of ongoing backtests completed.")
except Exception as e:
logger.error(f"Error during shutdown cleanup: {e}", exc_info=True)
finally:
logger.info("Exiting.")
exit(0)
def update_strategy_stats(self, user_id: int, strategy_name: str, results: dict): def update_strategy_stats(self, user_id: int, strategy_name: str, results: dict):
""" """
Update the strategy stats with the backtest results. Update the strategy stats with the backtest results.
@ -769,15 +988,21 @@ class Backtester:
""" Store the backtest results in the cache """ """ Store the backtest results in the cache """
cache_key = f"backtest:{user_name}:{backtest_name}" cache_key = f"backtest:{user_name}:{backtest_name}"
filter_vals = [('tbl_key', cache_key)] try:
backtest_data = self.data_cache.get_rows_from_cache('tests', filter_vals) # Use modify_datacache_item to update only the 'results' field
self.data_cache.modify_cache_item(
if not backtest_data.empty: cache_name='tests',
backtest_data['results'] = str(results) # Convert dict to string or JSON as per your cache implementation filter_vals=[('tbl_key', cache_key)],
self.data_cache.insert_row_into_cache('tests', backtest_data.columns, backtest_data.values, key=cache_key) field_name='results',
new_data=str(results) # Convert dict to string or JSON as needed
)
logger.info(f"Backtest results stored for '{backtest_name}' of user '{user_name}'.") logger.info(f"Backtest results stored for '{backtest_name}' of user '{user_name}'.")
else: except ValueError as ve:
logger.error(f"Backtest '{backtest_name}' not found in cache for user '{user_name}'.") logger.error(f"ValueError in storing backtest results for '{backtest_name}' of user '{user_name}': {ve}")
traceback.print_exc()
except Exception as e:
logger.error(f"Error storing backtest results for '{backtest_name}' of user '{user_name}': {e}")
traceback.print_exc()
def calculate_returns(self, equity_curve: list) -> list: def calculate_returns(self, equity_curve: list) -> list:
""" """