From 1af55f10a02476e644f86035afb1e02eadc6b789 Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 12 Nov 2024 09:43:18 -0400 Subject: [PATCH] The test are running without errors --- src/DataCache_v3.py | 30 ++++ src/StrategyInstance.py | 170 ++++++++++++++--------- src/backtesting.py | 297 +++++++++++++++++++++++++++++++++++----- 3 files changed, 395 insertions(+), 102 deletions(-) diff --git a/src/DataCache_v3.py b/src/DataCache_v3.py index 5d38c17..20be0b9 100644 --- a/src/DataCache_v3.py +++ b/src/DataCache_v3.py @@ -568,6 +568,36 @@ class CacheManager: # Call the cache system to remove the filtered rows cache.remove_item(filter_vals) + # In CacheManager class + + def remove_entries_with_prefix(self, cache_name: str, prefix: str) -> int: + """ + Removes all cache entries in the specified cache that start with the given prefix. + + :param cache_name: Name of the cache. + :param prefix: Prefix string to match cache keys. + :return: Number of entries removed. + """ + cache = self.get_cache(cache_name) + removed_count = 0 + if isinstance(cache, RowBasedCache): + keys_to_remove = [key for key in cache.cache.keys() if key.startswith(prefix)] + for key in keys_to_remove: + success = cache.remove_item([('tbl_key', key)]) + if success: + removed_count += 1 + elif isinstance(cache, TableBasedCache): + # Implement similar logic if needed for TableBasedCache + # For example, if 'strategy_instance_id' starts with the prefix + mask = self.caches[cache_name].cache['strategy_instance_id'].astype(str).str.startswith(prefix) + to_remove = self.caches[cache_name].cache[mask] + for _, row in to_remove.iterrows(): + filter_vals = [('strategy_instance_id', row['strategy_instance_id'])] + success = cache.remove_item(filter_vals) + if success: + removed_count += 1 + return removed_count + def modify_cache_item(self, cache_name: str, filter_vals: List[Tuple[str, any]], field_name: str, new_data: any) -> None: """ diff --git a/src/StrategyInstance.py b/src/StrategyInstance.py index 58090c5..1e95142 100644 --- a/src/StrategyInstance.py +++ b/src/StrategyInstance.py @@ -1,4 +1,7 @@ import logging + +import pandas as pd + from DataCache_v3 import DataCache from indicators import Indicators from trade import Trades @@ -77,25 +80,31 @@ class StrategyInstance: # Automatically load or initialize the context self._initialize_or_load_context() - def _initialize_or_load_context(self): """ Checks if a context exists for the strategy instance. If it does, load it; otherwise, initialize a new context. """ - if self.data_cache.get_rows_from_datacache( - cache_name='strategy_contexts', - filter_vals=[('strategy_instance_id', self.strategy_instance_id)] - ).empty: - self.initialize_new_context() - logger.debug(f"Initialized new context for StrategyInstance '{self.strategy_instance_id}'.") - else: - self.load_context() - logger.debug(f"Loaded existing context for StrategyInstance '{self.strategy_instance_id}'.") + try: + # Fetch the context data once + context_data = self.data_cache.get_rows_from_datacache( + cache_name='strategy_contexts', + filter_vals=[('strategy_instance_id', self.strategy_instance_id)] + ) + + if context_data.empty: + self.initialize_new_context() + logger.debug(f"Initialized new context for StrategyInstance '{self.strategy_instance_id}'.") + else: + self.load_context(context_data) + logger.debug(f"Loaded existing context for StrategyInstance '{self.strategy_instance_id}'.") + except Exception as e: + logger.error(f"Error during initialization of StrategyInstance '{self.strategy_instance_id}': {e}", exc_info=True) + traceback.print_exc() def initialize_new_context(self): """ - Initializes a new context for the strategy instance. + Initializes a new context for the strategy instance and saves it to the cache. """ self.flags = {} self.variables = {} @@ -104,35 +113,27 @@ class StrategyInstance: self.paused = False self.exit = False self.exit_method = 'all' - self.start_time = dt.datetime.now() + self.start_time = dt.datetime.now(dt.timezone.utc) # Insert initial context into the cache - self.save_context() - logger.debug(f"New context created and saved for StrategyInstance '{self.strategy_instance_id}'.") + self.insert_context() + logger.debug(f"New context created and inserted for StrategyInstance '{self.strategy_instance_id}'.") - def load_context(self): + def load_context(self, context_data: pd.DataFrame): """ - Loads the strategy execution context from the database. + Loads the strategy execution context from the provided context_data. """ try: - context_data = self.data_cache.get_rows_from_datacache( - cache_name='strategy_contexts', - filter_vals=[('strategy_instance_id', self.strategy_instance_id)] - ) - if context_data.empty: - logger.warning(f"No context found for StrategyInstance ID: {self.strategy_instance_id}") - return - context = context_data.iloc[0].to_dict() self.flags = json.loads(context.get('flags', '{}')) self.profit_loss = context.get('profit_loss', 0.0) - self.active = bool(context.get('active', True)) - self.paused = bool(context.get('paused', False)) - self.exit = bool(context.get('exit', False)) + self.active = bool(context.get('active', 1)) + self.paused = bool(context.get('paused', 0)) + self.exit = bool(context.get('exit', 0)) self.exit_method = context.get('exit_method', 'all') start_time_str = context.get('start_time') if start_time_str: - self.start_time = dt.datetime.fromisoformat(start_time_str) + self.start_time = dt.datetime.fromisoformat(start_time_str).replace(tzinfo=dt.timezone.utc) # Update exec_context with loaded flags and variables self.exec_context['flags'] = self.flags @@ -145,54 +146,91 @@ class StrategyInstance: logger.debug(f"Context loaded for StrategyInstance '{self.strategy_instance_id}'.") except Exception as e: - logger.error(f"Error loading context for StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) + logger.error(f"Error loading context for StrategyInstance '{self.strategy_instance_id}': {e}", exc_info=True) + traceback.print_exc() + + def insert_context(self): + """ + Inserts a new context into the cache and database. + """ + try: + columns = ( + "strategy_instance_id", "flags", "profit_loss", + "active", "paused", "exit", "exit_method", "start_time" + ) + values = ( + self.strategy_instance_id, + json.dumps(self.flags), + self.profit_loss, + int(self.active), + int(self.paused), + int(self.exit), + self.exit_method, + self.start_time.isoformat() + ) + + # Insert the new context without passing 'key' to avoid adding 'tbl_key' + self.data_cache.insert_row_into_datacache( + cache_name='strategy_contexts', + columns=columns, + values=values + # key=None is implicit + ) + logger.debug(f"Context inserted for StrategyInstance '{self.strategy_instance_id}'.") + except ValueError as ve: + logger.error(f"ValueError in inserting context for '{self.strategy_instance_id}': {ve}") + traceback.print_exc() + except Exception as e: + logger.error(f"Error inserting context for '{self.strategy_instance_id}': {e}") traceback.print_exc() def save_context(self): """ - Saves the current strategy execution context to the database. - Inserts a new row if it doesn't exist; otherwise, updates the existing row. + Saves the current strategy execution context to the cache and database. + Determines whether to insert a new row or modify an existing one. """ try: - self.data_cache.modify_datacache_item( + # Check if the context exists + existing_context = self.data_cache.get_rows_from_datacache( cache_name='strategy_contexts', - filter_vals=[('strategy_instance_id', self.strategy_instance_id)], - field_names=('flags', 'profit_loss', 'active', 'paused', 'exit', 'exit_method', 'start_time'), - new_values=( - json.dumps(self.flags), - self.profit_loss, - int(self.active), - int(self.paused), - int(self.exit), - self.exit_method, - self.start_time.isoformat() - ) + filter_vals=[('strategy_instance_id', self.strategy_instance_id)] ) - logger.debug(f"Context saved for StrategyInstance '{self.strategy_instance_id}'.") + + columns = ( + "strategy_instance_id", "flags", "profit_loss", + "active", "paused", "exit", "exit_method", "start_time" + ) + values = ( + self.strategy_instance_id, + json.dumps(self.flags), + self.profit_loss, + int(self.active), + int(self.paused), + int(self.exit), + self.exit_method, + self.start_time.isoformat() + ) + + if existing_context.empty: + # Insert a new context since it doesn't exist + self.insert_context() + logger.debug(f"Inserted new context for StrategyInstance '{self.strategy_instance_id}'.") + else: + # Modify the existing context without passing 'key' + self.data_cache.modify_datacache_item( + cache_name='strategy_contexts', + filter_vals=[('strategy_instance_id', self.strategy_instance_id)], + field_names=columns, + new_values=values, + overwrite='strategy_instance_id' # Ensures uniqueness + # Do not pass 'key' + ) + logger.debug(f"Modified existing context for StrategyInstance '{self.strategy_instance_id}'.") except ValueError as ve: - # If the record does not exist, insert it - logger.warning(f"StrategyInstance '{self.strategy_instance_id}' context not found. Attempting to insert.") - self.data_cache.insert_row_into_datacache( - cache_name='strategy_contexts', - columns=( - "strategy_instance_id", "flags", "profit_loss", - "active", "paused", "exit", "exit_method", "start_time" - ), - values=( - self.strategy_instance_id, - json.dumps(self.flags), - self.profit_loss, - int(self.active), - int(self.paused), - int(self.exit), - self.exit_method, - self.start_time.isoformat() - ) - ) - logger.debug(f"Inserted new context for StrategyInstance '{self.strategy_instance_id}'.") + logger.error(f"ValueError in saving context for '{self.strategy_instance_id}': {ve}") + traceback.print_exc() except Exception as e: - logger.error(f"Error saving context for StrategyInstance '{self.strategy_instance_id}': {e}") + logger.error(f"Error saving context for '{self.strategy_instance_id}': {e}") traceback.print_exc() def override_exec_context(self, key: str, value: Any): diff --git a/src/backtesting.py b/src/backtesting.py index 9a44c8a..6c64402 100644 --- a/src/backtesting.py +++ b/src/backtesting.py @@ -1,14 +1,18 @@ import logging +import time +import traceback import types import uuid + import backtrader as bt import datetime as dt -from DataCache_v3 import DataCache +from DataCache_v3 import DataCache, RowBasedCache, TableBasedCache from Strategies import Strategies from StrategyInstance import StrategyInstance from indicators import Indicators import numpy as np import pandas as pd +import signal # Configure logging logger = logging.getLogger(__name__) @@ -37,24 +41,114 @@ class Backtester: self.strategies = strategies self.indicators_manager = indicators self.socketio = socketio - # Create a cache for storing back-tests - self.data_cache.create_cache('tests', cache_type='row', size_limit=100, - default_expiration=dt.timedelta(days=1), - eviction_policy='evict') - def cache_backtest(self, user_name, backtest_name, backtest_data): - """ Cache the backtest data for a user """ - columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results') + # Ensure 'tests' cache exists + self.data_cache.create_cache( + 'tests', + cache_type='row', + size_limit=100, + default_expiration=dt.timedelta(hours=2), # Set to 2 hours + eviction_policy='evict', + columns=[ + 'user_name', # TEXT + 'strategy_name', # TEXT + 'start_time', # TEXT or DATETIME + 'capital', # REAL or INTEGER + 'commission', # REAL + 'results', # TEXT or JSON + 'strategy_instance_id' # TEXT + ] + ) + # Start periodic purge (optional) + self.start_periodic_purge(interval_seconds=3600) # Purge every hour + + # Register signal handlers for graceful shutdown + signal.signal(signal.SIGINT, self.shutdown_handler) + signal.signal(signal.SIGTERM, self.shutdown_handler) + + def cache_backtest(self, user_name: str, backtest_name: str, backtest_data: dict, strategy_instance_id: str): + """ + Cache the backtest data for a user. + If the backtest already exists, update it; otherwise, insert a new entry. + + :param user_name: Name of the user. + :param backtest_name: Name of the backtest. + :param backtest_data: Dictionary containing backtest parameters. + """ + cache_key = f"backtest:{user_name}:{backtest_name}" + + # Define columns and corresponding values (excluding 'tbl_key') + columns = ('user_name', 'strategy_name', 'start_time', 'capital', 'commission', 'results', 'strategy_instance_id') values = ( backtest_data.get('user_name'), backtest_data.get('strategy'), backtest_data.get('start_date'), - backtest_data.get('capital', 10000), # Default capital if not provided + backtest_data.get('capital', 10000), # Default capital if not provided backtest_data.get('commission', 0.001), # Default commission - None # No results yet; will be filled in after backtest completion + None, # No results yet; will be filled in after backtest completion + strategy_instance_id ) - cache_key = f"backtest:{user_name}:{backtest_name}" - self.data_cache.insert_row_into_cache('tests', columns, values, key=cache_key) + + try: + # Check if the backtest already exists + existing_backtest = self.data_cache.get_rows_from_cache( + cache_name='tests', + filter_vals=[('tbl_key', cache_key)] + ) + + if existing_backtest.empty: + # Insert new backtest entry + self.data_cache.insert_row_into_cache( + cache_name='tests', + columns=columns, + values=values, + key=cache_key + ) + logger.debug(f"Inserted new backtest entry '{cache_key}'.") + else: + # Update existing backtest entry (e.g., reset 'results' if needed) + # Here, we assume you might want to reset 'results' when re-running + self.data_cache.modify_datacache_item( + cache_name='tests', + filter_vals=[('tbl_key', cache_key)], + field_names=('results',), + new_values=(None,), # Reset results + overwrite='tbl_key' # Ensures uniqueness based on 'tbl_key' + ) + logger.debug(f"Updated existing backtest entry '{cache_key}'. Reset 'results'.") + except Exception as e: + logger.error(f"Error in cache_backtest for '{cache_key}': {e}", exc_info=True) + # Depending on your application, you might want to raise the exception + # raise e + + def cleanup_backtest(self, backtest_key: str, strategy_instance_id: str) -> None: + """ + Removes cache entries related to a specific backtest and its strategy context. + + :param backtest_key: The unique key identifying the backtest in the 'tests' cache. + :param strategy_instance_id: The unique identifier for the strategy instance in 'strategy_contexts'. + """ + try: + # Remove backtest entry from 'tests' cache + self.data_cache.remove_row_from_datacache( + cache_name='tests', + filter_vals=[('tbl_key', backtest_key)], + remove_from_db=False # Assuming tests are transient and not stored in DB + ) + logger.info(f"Backtest '{backtest_key}' removed from 'tests' cache.") + + # Remove strategy context from 'strategy_contexts' cache + # Assuming 'strategy_contexts' is a table-based cache + self.data_cache.remove_row_from_datacache( + cache_name='strategy_contexts', + filter_vals=[('strategy_instance_id', strategy_instance_id)], + remove_from_db=True # Remove from DB as well + ) + logger.info(f"Strategy context '{strategy_instance_id}' removed from 'strategy_contexts' cache.") + except Exception as e: + logger.error(f"Error during cleanup of backtest '{backtest_key}': {e}", exc_info=True) + + def map_user_strategy(self, user_strategy: dict, precomputed_indicators: dict[str, pd.DataFrame], mode: str = 'testing') -> any: @@ -416,6 +510,13 @@ class Backtester: using lists, while providing a clearer structure when dictionaries are preferred. """ try: + # Check if start_date is present and non-empty + if not start_date: + # Set default start_date to one hour ago in UTC + one_hour_ago = dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=1) + start_date = one_hour_ago.strftime('%Y-%m-%dT%H:%M') + logger.warning(f"No 'start_date' provided. Setting default start_date to '{start_date}'.") + # Convert the start date to a datetime object and make it timezone-aware (UTC) start_dt = dt.datetime.strptime(start_date, '%Y-%m-%dT%H:%M') start_dt = start_dt.replace(tzinfo=dt.timezone.utc) # Set UTC timezone @@ -453,6 +554,7 @@ class Backtester: def precompute_indicators(self, indicators_definitions: list, user_name: str, data_feed: pd.DataFrame) -> dict: """ Precompute indicator values and return a dictionary of DataFrames. + :param user_name: The username associated with the source of the data feed. :param indicators_definitions: List of indicator definitions. :param data_feed: Pandas DataFrame with OHLC data. :return: Dictionary mapping indicator names to their precomputed DataFrames. @@ -540,7 +642,7 @@ class Backtester: return data_feed, precomputed_indicators def run_backtest(self, strategy_class, data_feed: pd.DataFrame, msg_data: dict, user_name: str, - callback, socket_conn_id: str, strategy_instance: StrategyInstance): + callback, socket_conn_id: str, strategy_instance: StrategyInstance): """ Runs a backtest using Backtrader and uses Flask-SocketIO's background tasks. Sends progress updates to the client via WebSocket. @@ -551,7 +653,7 @@ class Backtester: try: # **Convert 'time' to 'datetime' if necessary** if 'time' in data_feed.columns: - data_feed['datetime'] = pd.to_datetime(data_feed['time'], unit='ms') # Adjust 'unit' if needed + data_feed['datetime'] = pd.to_datetime(data_feed['time'], unit='ms') data_feed.set_index('datetime', inplace=True) logger.info("Converted 'time' to 'datetime' and set as index in data_feed.") @@ -561,8 +663,6 @@ class Backtester: logger.error("Data feed is missing one or more required columns: %s", columns_to_keep) raise ValueError("Incomplete data feed for Backtrader.") - data_feed = data_feed[columns_to_keep] - cerebro = bt.Cerebro() # Assign cerebro to strategy_instance for potential use in custom methods @@ -571,7 +671,7 @@ class Backtester: # Add the mapped strategy to the backtest, including strategy_instance as a parameter cerebro.addstrategy(strategy_class, strategy_instance=strategy_instance) - # Add the main data feed to Cerebro + # Add data feed to Cerebro bt_feed = bt.feeds.PandasData(dataname=data_feed) cerebro.adddata(bt_feed) @@ -622,7 +722,10 @@ class Backtester: # Handle exceptions and send error messages to the client error_message = f"Backtest execution failed: {str(e)}" self.socketio.emit('backtest_error', {"message": error_message}, room=socket_conn_id) - logger.error(f"[BACKTEST ERROR] {error_message}") + logger.error(f"[BACKTEST ERROR] {error_message}", exc_info=True) + + # Invoke callback with failure details to ensure cleanup + callback({"success": False, "message": error_message}) # Start the backtest as a background task self.socketio.start_background_task(execute_backtest) @@ -638,12 +741,13 @@ class Backtester: user_name = msg_data.get('user_name') backtest_name = f"{msg_data.get('strategy', 'UnnamedStrategy')}_backtest" - # Cache the backtest data - self.cache_backtest(user_name, backtest_name, msg_data) - # Fetch the strategy using user_id and strategy_name strategy_name = msg_data.get('strategy') - user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name) + try: + user_strategy = self.strategies.get_strategy_by_name(user_id=int(user_id), name=strategy_name) + except ValueError: + logger.error(f"Invalid user_id '{user_id}'. Must be an integer.") + return {"error": f"Invalid user_id '{user_id}'. Must be an integer."} if not user_strategy: logger.error(f"Strategy '{strategy_name}' not found for user '{user_name}'.") @@ -665,7 +769,7 @@ class Backtester: return {"error": f"Invalid user_id '{user_id}'. Must be an integer."} # Generate unique strategy_instance_id for the backtest - strategy_instance_id = f"test_{user_id}_{strategy_name}_{dt.datetime.now().isoformat()}" + strategy_instance_id = f"test_{uuid.uuid4()}" # Instantiate StrategyInstance with proper indicators and trades strategy_instance = StrategyInstance( @@ -685,14 +789,34 @@ class Backtester: # Map the user strategy to a Backtrader-compatible strategy class mapped_strategy_class = self.map_user_strategy(user_strategy, precomputed_indicators) + # Define the backtest key for caching + backtest_key = f"backtest:{user_name}:{backtest_name}" + + # Cache the backtest initiation in 'tests' cache using the upsert method + self.cache_backtest(user_name, backtest_name, msg_data, strategy_instance_id) + # Define the callback function to handle backtest completion def backtest_callback(results): - self.store_backtest_results(user_name, backtest_name, results) - self.update_strategy_stats(user_id_int, strategy_name, results) + try: + if results.get("success") is False: + # Handle backtest failure + self.store_backtest_results(user_name, backtest_name, results) + logger.error(f"Backtest '{backtest_name}' failed for user '{user_name}': {results.get('message')}") + else: + # Handle backtest success + self.store_backtest_results(user_name, backtest_name, results) + self.update_strategy_stats(user_id_int, strategy_name, results) - # Emit the results back to the client - self.socketio.emit('backtest_results', {"test_id": backtest_name, "results": results}, room=socket_conn_id) - logger.info(f"[BACKTEST COMPLETE] Results emitted to user '{user_name}'.") + # Emit the results back to the client + self.socketio.emit( + 'backtest_results', + {"test_id": backtest_name, "results": results}, + room=socket_conn_id + ) + logger.info(f"[BACKTEST COMPLETE] Results emitted to user '{user_name}'.") + finally: + # Cleanup regardless of success or failure + self.cleanup_backtest(backtest_key, strategy_instance_id) # Run the backtest asynchronously, passing the strategy_instance self.run_backtest( @@ -708,6 +832,101 @@ class Backtester: logger.info(f"Backtest '{backtest_name}' started for user '{user_name}'.") return {"reply": "backtest_started"} + def start_periodic_purge(self, interval_seconds: int = 3600): + """ + Starts a background task that periodically purges expired cache entries and cleans up orphaned backtest contexts. + + :param interval_seconds: Time interval between purges in seconds. + """ + def purge_task(): + while True: + time.sleep(interval_seconds) + # Purge expired entries from all caches + for cache_name, cache in self.data_cache.caches.items(): + if isinstance(cache, RowBasedCache) or isinstance(cache, TableBasedCache): + cache._purge_expired() + logger.debug(f"Purged expired entries from cache '{cache_name}'.") + + # Additionally, clean up orphaned backtest contexts + self.cleanup_orphaned_backtest_contexts() + + self.socketio.start_background_task(purge_task) + + + def cleanup_orphaned_backtest_contexts(self) -> None: + """ + Identifies and removes orphaned backtest contexts that do not have corresponding entries in 'tests' cache. + """ + try: + # Fetch all strategy_instance_ids from 'strategy_contexts' that start with 'test_' + strategy_contexts_df = self.data_cache.get_rows_from_datacache( + cache_name='strategy_contexts', + filter_vals=[] # Fetch all + ) + + if strategy_contexts_df.empty: + logger.debug("No strategy contexts found for cleanup.") + return + + # Filter contexts that are backtests (strategy_instance_id starts with 'test_') + backtest_contexts = strategy_contexts_df[ + strategy_contexts_df['strategy_instance_id'].astype(str).str.startswith('test_') + ] + + if backtest_contexts.empty: + logger.debug("No backtest contexts found for cleanup.") + return + + for _, row in backtest_contexts.iterrows(): + strategy_instance_id = row['strategy_instance_id'] + # Check if this backtest exists in 'tests' cache + # Since 'tests' cache uses 'backtest_key' as 'tbl_key', and it maps to 'strategy_instance_id' + # We'll need to search 'tests' cache for the corresponding 'strategy_instance_id' + found = False + tests_cache = self.data_cache.get_cache('tests') + if isinstance(tests_cache, RowBasedCache): + for key, entry in tests_cache.cache.items(): + if entry.data.get('strategy_instance_id') == strategy_instance_id: + found = True + break + + if not found: + # Orphaned context found; proceed to remove it + self.data_cache.remove_row_from_datacache( + cache_name='strategy_contexts', + filter_vals=[('strategy_instance_id', strategy_instance_id)], + remove_from_db=True + ) + logger.info(f"Orphaned backtest context '{strategy_instance_id}' removed from 'strategy_contexts' cache.") + except Exception as e: + logger.error(f"Error during cleanup of orphaned backtest contexts: {e}", exc_info=True) + + def shutdown_handler(self, signum, frame): + """ + Handles server shutdown signals to perform cleanup. + """ + logger.info(f"Received shutdown signal: {signum}. Cleaning up ongoing backtests.") + try: + # Fetch all backtest entries from 'tests' cache + tests_cache = self.data_cache.get_cache('tests') + if isinstance(tests_cache, RowBasedCache): + active_tests = list(tests_cache.cache.keys()) # Get all backtest keys + for backtest_key in active_tests: + # Fetch strategy_instance_id for each backtest + strategy_instance_id = self.data_cache.get_cache_item( + item_name='strategy_instance_id', + cache_name='tests', + filter_vals=('tbl_key', backtest_key) + ) + if strategy_instance_id: + self.cleanup_backtest(backtest_key, strategy_instance_id) + logger.info("Cleanup of ongoing backtests completed.") + except Exception as e: + logger.error(f"Error during shutdown cleanup: {e}", exc_info=True) + finally: + logger.info("Exiting.") + exit(0) + def update_strategy_stats(self, user_id: int, strategy_name: str, results: dict): """ Update the strategy stats with the backtest results. @@ -769,15 +988,21 @@ class Backtester: """ Store the backtest results in the cache """ cache_key = f"backtest:{user_name}:{backtest_name}" - filter_vals = [('tbl_key', cache_key)] - backtest_data = self.data_cache.get_rows_from_cache('tests', filter_vals) - - if not backtest_data.empty: - backtest_data['results'] = str(results) # Convert dict to string or JSON as per your cache implementation - self.data_cache.insert_row_into_cache('tests', backtest_data.columns, backtest_data.values, key=cache_key) + try: + # Use modify_datacache_item to update only the 'results' field + self.data_cache.modify_cache_item( + cache_name='tests', + filter_vals=[('tbl_key', cache_key)], + field_name='results', + new_data=str(results) # Convert dict to string or JSON as needed + ) logger.info(f"Backtest results stored for '{backtest_name}' of user '{user_name}'.") - else: - logger.error(f"Backtest '{backtest_name}' not found in cache for user '{user_name}'.") + except ValueError as ve: + logger.error(f"ValueError in storing backtest results for '{backtest_name}' of user '{user_name}': {ve}") + traceback.print_exc() + except Exception as e: + logger.error(f"Error storing backtest results for '{backtest_name}' of user '{user_name}': {e}") + traceback.print_exc() def calculate_returns(self, equity_curve: list) -> list: """