import logging import pandas as pd from DataCache_v3 import DataCache from indicators import Indicators from trade import Trades import datetime as dt import json import uuid import traceback from typing import Any from PythonGenerator import PythonGenerator from StrategyInstance import StrategyInstance # Configure logging logger = logging.getLogger(__name__) class Strategies: def __init__(self, data_cache: DataCache, trades: Trades, indicators: Indicators): """ Initializes the Strategies class. :param data_cache: Instance of DataCache to manage cache and database interactions. :param trades: Reference to the trades object that maintains trading actions and data. :param indicators: Reference to the Indicators manager. """ self.data_cache = data_cache # Database interaction instance self.trades = trades self.indicators_manager = indicators # Create a cache for strategies with necessary columns self.data_cache.create_cache(name='strategies', cache_type='table', size_limit=500, eviction_policy='deny', default_expiration=dt.timedelta(hours=24), columns=["creator", "name", "workspace", "code", "stats", "public", "fee", "tbl_key", "strategy_components"]) # Create a cache for strategy contexts to store strategy states and settings self.data_cache.create_cache( name='strategy_contexts', cache_type='table', size_limit=1000, eviction_policy='deny', default_expiration=dt.timedelta(hours=24), columns=[ "strategy_instance_id", # Unique identifier for the strategy instance "flags", # JSON-encoded string to store flags "profit_loss", # Float value for tracking profit/loss "active", # Boolean or Integer (1/0) for active status "paused", # Boolean or Integer (1/0) for paused status "exit", # Boolean or Integer (1/0) for exit status "exit_method", # String defining exit method "start_time" # ISO-formatted datetime string for start time ] ) # Initialize default settings self.default_timeframe = '5m' self.default_exchange = 'Binance' self.default_symbol = 'BTCUSD' self.active_instances: dict[tuple[int, str], StrategyInstance] = {} # Key: (user_id, strategy_id) def _save_strategy(self, strategy_data: dict, default_source: dict) -> dict: """ Saves a strategy to the cache and database. Handles both creation and editing. :param strategy_data: A dictionary containing strategy data such as name, code, workspace, etc. :param default_source: The default source for undefined sources in the strategy. :return: A dictionary containing success or failure information. """ is_edit = 'tbl_key' in strategy_data try: # Determine if this is an edit or a new creation tbl_key = strategy_data.get('tbl_key', str(uuid.uuid4())) if is_edit: # Verify the existing strategy existing_strategy = self.data_cache.get_rows_from_datacache( cache_name='strategies', filter_vals=[('tbl_key', tbl_key)], include_tbl_key=True ) if existing_strategy.empty: return {"success": False, "message": "Strategy not found."} else: # Check for duplicate strategy name filter_conditions = [ ('creator', strategy_data.get('creator')), ('name', strategy_data['name']) ] existing_strategy = self.data_cache.get_rows_from_datacache( cache_name='strategies', filter_vals=filter_conditions, include_tbl_key = True ) if not existing_strategy.empty: return {"success": False, "message": "A strategy with this name already exists"} # Validate and serialize 'workspace' workspace_data = strategy_data.get('workspace') if not isinstance(workspace_data, str) or not workspace_data.strip(): return {"success": False, "message": "Invalid or empty workspace data"} # Serialize 'stats' field try: stats_data = strategy_data.get('stats', {}) stats_serialized = json.dumps(stats_data) except (TypeError, ValueError): return {"success": False, "message": "Invalid stats data format"} # Validate and parse 'code' code = strategy_data.get('code') if isinstance(code, str): try: strategy_json = json.loads(code) if not isinstance(strategy_json, dict): return {"success": False, "message": "'code' must be a JSON object."} except json.JSONDecodeError: return {"success": False, "message": "Invalid JSON format for 'code'."} elif isinstance(code, dict): strategy_json = code strategy_data['code'] = json.dumps(strategy_json) # Serialize for storage else: return {"success": False, "message": "'code' must be a JSON string or dictionary."} # Generate Python components using PythonGenerator python_generator = PythonGenerator(default_source.copy(), tbl_key) strategy_components = python_generator.generate(strategy_json) strategy_data['strategy_components'] = json.dumps(strategy_components) # Prepare fields for database operations columns = ( "creator", "name", "workspace", "code", "stats", "public", "fee", "tbl_key", "strategy_components" ) values = ( strategy_data.get('creator'), strategy_data['name'], workspace_data, strategy_data['code'], stats_serialized, bool(strategy_data.get('public', 0)), float(strategy_data.get('fee', 0.0)), tbl_key, strategy_data['strategy_components'] ) if is_edit: # Update the existing strategy self.data_cache.modify_datacache_item( cache_name='strategies', filter_vals=[('tbl_key', tbl_key)], field_names=columns, new_values=values, key=tbl_key, overwrite='tbl_key' ) else: # Insert a new strategy self.data_cache.insert_row_into_datacache( cache_name='strategies', columns=columns, values=values ) # Prepare the response response_strategy = strategy_data.copy() response_strategy.pop("strategy_components", None) # Remove the sensitive field # Include `tbl_key` within the `strategy` object response_strategy['tbl_key'] = tbl_key return { "success": True, "message": "Strategy saved successfully", "strategy": response_strategy, "updated_at": dt.datetime.now(dt.timezone.utc).isoformat() } except Exception as e: # Handle exceptions and log errors logger.error(f"Failed to save strategy: {e}", exc_info=True) traceback.print_exc() operation = "update" if is_edit else "create" return {"success": False, "message": f"Failed to {operation} strategy: {str(e)}"} def new_strategy(self, strategy_data: dict, default_source: dict) -> dict: """ Add a new strategy to the cache and database. :param default_source: The default source for undefined sources in the strategy. :param strategy_data: A dictionary containing strategy data such as name, code, and workspace. :return: A dictionary containing success or failure information. """ return self._save_strategy(strategy_data, default_source) def edit_strategy(self, strategy_data: dict, default_source: dict) -> dict: """ Updates an existing strategy in the cache and database. :param default_source: The default source for undefined sources in the strategy. :param strategy_data: A dictionary containing the updated strategy data. :return: A dictionary containing success or failure information. """ return self._save_strategy(strategy_data, default_source) def delete_strategy(self, tbl_key: str) -> dict: """ Deletes a strategy identified by its tbl_key. :param tbl_key: The unique identifier of the strategy to delete. :return: A dictionary indicating success or failure with an appropriate message. """ try: self.data_cache.remove_row_from_datacache( cache_name='strategies', filter_vals=[('tbl_key', tbl_key)] ) return {"success": True, "message": "Strategy deleted successfully."} except Exception as e: logger.error(f"Failed to delete strategy '{tbl_key}': {e}", exc_info=True) return {"success": False, "message": f"Failed to delete strategy: {str(e)}"} def get_all_strategy_names(self, user_id: int) -> list | None: """ Return a list of all public and user strategy names stored in the cache or database. """ # Fetch all strategy names from the cache or database strategies_df = self.get_all_strategies(user_id, 'df') if not strategies_df.empty: return strategies_df['name'].tolist() return None def get_all_strategies(self, user_id: int | None, form: str, include_all: bool = False): """ Return stored strategies in various formats. :param user_id: The ID of the user making the request. If None, fetch all public strategies. :param form: The desired format ('obj', 'json', or 'dict'). :param include_all: If True, fetch all strategies (public and private) for all users. :return: A list of strategies in the requested format. """ valid_forms = {'df', 'json', 'dict'} if form not in valid_forms: raise ValueError(f"Invalid form '{form}'. Expected one of {valid_forms}.") if include_all: # Fetch all strategies regardless of user or public status strategies_df = self.data_cache.get_all_rows_from_datacache( cache_name='strategies', ) else: # Fetch public strategies public_df = self.data_cache.get_rows_from_datacache( cache_name='strategies', filter_vals=[('public', 1)], include_tbl_key=True ) if user_id is not None: # Fetch user-specific private strategies user_df = self.data_cache.get_rows_from_datacache( cache_name='strategies', filter_vals=[('creator', user_id), ('public', 0)], include_tbl_key=True ) # Concatenate the two DataFrames strategies_df = pd.concat([public_df, user_df], ignore_index=True) else: strategies_df = public_df # Return None if no strategies found if strategies_df.empty: return None # Return the strategies in the requested format if form == 'df': return strategies_df elif form == 'json': return strategies_df.to_json(orient='records') elif form == 'dict': return strategies_df.to_dict('records') def get_strategy_by_tbl_key(self, tbl_key: str) -> dict[str, Any] | None: """ Retrieve a strategy object by tbl_key. :param tbl_key: The unique identifier of the strategy. :return: The strategy dictionary if found, else None. """ strategies_df = self.get_all_strategies(None, 'df', include_all=True) if strategies_df is None: return None filtered_strategies = strategies_df.query('tbl_key == @tbl_key') if filtered_strategies.empty: return None strategy_row = filtered_strategies.iloc[0].to_dict() # Deserialize the 'strategy_components' field try: strategy_components = json.loads(strategy_row.get('strategy_components', '{}')) except json.JSONDecodeError: strategy_components = {} strategy_row['strategy_components'] = strategy_components # If 'code' is stored as a JSON string, deserialize it if isinstance(strategy_row.get('code'), str): strategy_row['code'] = json.loads(strategy_row['code']) return strategy_row def update_strategy_stats(self, strategy_id: str, profit_loss: float) -> None: """ Updates the strategy's statistics based on the latest profit or loss. :param strategy_id: Identifier of the strategy. :param profit_loss: Latest profit or loss from strategy execution. """ try: # Fetch the current stats strategy = self.data_cache.get_rows_from_datacache(cache_name='strategies', filter_vals=[('tbl_key', strategy_id)], include_tbl_key=True) if strategy.empty: logger.warning(f"Strategy ID {strategy_id} not found for stats update.") return strategy_row = strategy.iloc[0].to_dict() current_stats = json.loads(strategy_row.get('stats', '{}')) # Update stats current_stats['num_trades'] = current_stats.get('num_trades', 0) + 1 current_stats['total_position'] = current_stats.get('total_position', 0.0) + profit_loss current_stats['total_position_value'] = current_stats.get('total_position_value', 0.0) + profit_loss # Serialize and update in datacache self.data_cache.modify_datacache_item(cache_name='strategies', filter_vals=[('tbl_key', strategy_id)], field_names=('stats',), new_values=(json.dumps(current_stats),), key=strategy_id, overwrite='tbl_key') logger.info(f"Updated stats for strategy {strategy_id}: {current_stats}") except Exception as e: logger.error(f"Error updating stats for strategy '{strategy_id}': {e}", exc_info=True) def execute_strategy(self, strategy_data: dict[str, Any]) -> dict[str, Any]: """ Executes a strategy based on the provided strategy data. :param strategy_data: A dictionary containing strategy details. :return: A dictionary indicating success or failure with relevant messages. """ try: # Extract identifiers strategy_id = strategy_data.get('tbl_key') strategy_name = strategy_data.get('name') user_id = strategy_data.get('creator') if not strategy_id or not strategy_name or not user_id: return {"success": False, "message": "Strategy data is incomplete."} # Generate a deterministic strategy_instance_id strategy_instance_id = f"{user_id}_{strategy_name}" instance_key = (user_id, strategy_id) # Unique key for the strategy-user pair # Retrieve or create StrategyInstance if instance_key not in self.active_instances: generated_code = strategy_data.get('strategy_components', {}).get('generated_code', '') if not generated_code: return {"success": False, "message": "No 'next()' method defined for the strategy."} # Instantiate StrategyInstance strategy_instance = StrategyInstance( strategy_instance_id=strategy_instance_id, strategy_id=strategy_id, strategy_name=strategy_name, user_id=user_id, generated_code=generated_code, data_cache=self.data_cache, indicators=self.indicators_manager, trades=self.trades ) # Store in active_instances self.active_instances[instance_key] = strategy_instance logger.debug(f"Created new StrategyInstance '{strategy_instance_id}' for strategy '{strategy_id}'.") else: strategy_instance = self.active_instances[instance_key] logger.debug( f"Retrieved existing StrategyInstance '{strategy_instance_id}' for strategy '{strategy_id}'.") # Execute the strategy execution_result = strategy_instance.execute() if execution_result.get('success'): profit_loss = execution_result.get('profit_loss', 0.0) self.update_strategy_stats(strategy_id, profit_loss) logger.info(f"Strategy '{strategy_id}' executed successfully with profit/loss: {profit_loss}") # Check if exit was initiated if strategy_instance.exit: # Check if all trades are closed if self.trades.all_trades_closed(strategy_id): logger.info(f"Strategy '{strategy_id}' has successfully exited all trades.") # Remove from active_instances del self.active_instances[instance_key] # Optionally, mark the strategy as inactive in the database self.data_cache.modify_datacache_item( cache_name='strategies', filter_vals=[('tbl_key', strategy_id)], field_names=('active',), new_values=(False,), key=strategy_id, overwrite='tbl_key' ) else: logger.info( f"Strategy '{strategy_id}' is exiting. Remaining trades will be closed in subsequent executions.") return {"success": True, "strategy_profit_loss": profit_loss} else: error_message = execution_result.get('message', 'Unknown error.') logger.error(f"Strategy '{strategy_id}' execution failed: {error_message}") return {"success": False, "message": f"Strategy execution failed: {error_message}"} except Exception as e: logger.error(f"Unexpected error in execute_strategy: {e}") traceback.print_exc() return {"success": False, "message": f"Unexpected error: {str(e)}"} def update(self): """ Loops through and executes all activated strategies. """ try: active_strategies = self.data_cache.get_rows_from_datacache('strategies', [('active', True)], include_tbl_key=True) if active_strategies.empty: logger.info("No active strategies to execute.") return # No active strategies to execute for _, strategy_data in active_strategies.iterrows(): self.execute_strategy(strategy_data) except Exception as e: logger.error(f"Error updating strategies: {e}", exc_info=True) traceback.print_exc() def update_stats(self, tbl_key: str, stats: dict) -> None: """ Updates the strategy's statistics with the provided stats. :param tbl_key: Identifier of the strategy (tbl_key). :param stats: Dictionary containing statistics to update. """ try: # Fetch the current strategy data strategy = self.data_cache.get_rows_from_datacache( cache_name='strategies', filter_vals=[('tbl_key', tbl_key)], include_tbl_key=True) if strategy.empty: logger.warning(f"Strategy ID {tbl_key} not found for stats update.") return strategy_row = strategy.iloc[0].to_dict() current_stats = json.loads(strategy_row.get('stats', '{}')) # Merge the new stats with existing stats current_stats.update(stats) # Serialize the updated stats updated_stats_serialized = json.dumps(current_stats) # Update the stats in the data cache self.data_cache.modify_datacache_item( cache_name='strategies', filter_vals=[('tbl_key', tbl_key)], field_names=('stats',), new_values=(updated_stats_serialized,), key=tbl_key, overwrite='tbl_key' ) logger.info(f"Updated stats for strategy '{tbl_key}': {current_stats}") except Exception as e: logger.error(f"Error updating stats for strategy '{tbl_key}': {e}", exc_info=True)