diff --git a/requirements.txt b/requirements.txt index 513db52..1406937 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,11 +2,12 @@ numpy<2.0.0 flask==3.0.3 config~=0.5.1 PyYAML==6.0.2 -requests==2.30.0 pandas==2.2.3 passlib~=1.7.4 ccxt==4.4.8 -flask-socketio +flask-socketio~=5.4.1 pytz==2024.2 backtrader==1.9.78.123 -eventlet~=0.37.0 \ No newline at end of file +eventlet~=0.37.0 +Flask-Cors~=3.0.10 +email_validator~=2.2.0 \ No newline at end of file diff --git a/src/PythonGenerator.py b/src/PythonGenerator.py new file mode 100644 index 0000000..da3e66d --- /dev/null +++ b/src/PythonGenerator.py @@ -0,0 +1,1551 @@ +# PythonGenerator.py + +import logging +import math +from typing import Any, Dict, List, Set, Tuple, Union + +logger = logging.getLogger(__name__) + + +class PythonGenerator: + def __init__(self, default_source: Dict[str, Any], strategy_id: str): + """ + Initializes the PythonGenerator. + + :param default_source: The default source for undefined sources in the strategy. + :param strategy_id: The ID of the strategy. + """ + self.indicators_used: List[Dict[str, Any]] = [] + self.data_sources_used: Set[Tuple[str, str, str]] = set() + self.flags_used: Set[str] = set() + self.default_source: Dict[str, Any] = default_source + self.strategy_id: str = strategy_id + + # Initialize scheduled action tracking + self.scheduled_action_count: int = 0 + self.scheduled_actions: List[str] = [] + + def generate(self, strategy_json: Dict[str, Any]) -> Dict[str, Any]: + """ + Generates the 'next()' method code and collects indicators, data sources, and flags used. + + :param strategy_json: The JSON definition of the strategy. + :return: A dictionary containing 'generated_code', 'indicators', 'data_sources', and 'flags_used'. + """ + # Reset tracking attributes + self.indicators_used.clear() + self.data_sources_used.clear() + self.flags_used.clear() + + # Initialize code components + code_lines = [] + indent_level = 1 # For 'next' method code indentation + + # Start generating the 'next' method + code_lines.append("def next():") + indent_level += 1 # Increase indent level inside the 'next' method + + # Recursively generate code from JSON nodes + code_lines.extend(self.generate_code_from_json(strategy_json, indent_level)) + + # Handle exit logic at the end of 'next()' + indent = ' ' * indent_level + exit_indent = ' ' * (indent_level + 1) + code_lines.append(f"{indent}if self.exit:") + code_lines.append(f"{exit_indent}self.exit_strategy()") + code_lines.append(f"{exit_indent}self.paused = True # Pause the strategy while exiting.") + + # Join the code lines into a single string + next_method_code = '\n'.join(code_lines) + + # Prepare the combined dictionary + strategy_components = { + 'generated_code': next_method_code, + 'indicators': self.indicators_used.copy(), + 'data_sources': list(self.data_sources_used), + 'flags_used': list(self.flags_used) + } + + logger.debug("Generated 'next()' method code.") + return strategy_components + + # ============================== + # Helper Methods + # ============================== + + def process_numeric_input(self, value, option_name, default_value=1, min_value=None, max_value=None): + """ + Processes a numeric input, applying limits if necessary. + + :param value: The value to validate. + :param option_name: The name of the option for logging. + :param default_value: Default value if input is invalid. + :param min_value: Minimum allowable value. + :param max_value: Maximum allowable value. + :return: Validated numeric value. + """ + if isinstance(value, (int, float)): + if min_value is not None and value < min_value: + logger.warning(f"{option_name} value too low. Clamping to {min_value}.") + return min_value + if max_value is not None and value > max_value: + logger.warning(f"{option_name} value too high. Clamping to {max_value}.") + return max_value + return value + elif isinstance(value, dict): + return self.generate_code_from_json(value, 0) + else: + logger.warning(f"Invalid {option_name} value. Using default: {default_value}.") + return default_value + + def process_numeric_list(self, value, indent_level: int, allow_single_value: bool = True) -> str: + """ + Processes a numeric input or list of inputs, handling dynamic values and lists. + + :param value: The input value, which may be a single value, list, or nested expression. + :param indent_level: Current indentation level. + :param allow_single_value: Whether to return a single value as-is instead of wrapped in a list. + :return: A string representing the processed numeric expression(s). + """ + if isinstance(value, list): + # Process each item in the list individually + processed_values = [ + self.generate_condition_code(v, indent_level) if isinstance(v, dict) else str(v) + for v in value + ] + return f"[{', '.join(processed_values)}]" + + elif isinstance(value, dict): + # Handle nested expressions + return self.generate_condition_code(value, indent_level) + + elif allow_single_value: + # Single numeric value case + return str(value) + + else: + # Forcing single value into a list format if allow_single_value is False + return f"[{str(value)}]" + + def generate_code_from_json(self, json_nodes: Any, indent_level: int) -> List[str]: + """ + Recursively generates code lines from JSON nodes. + + :param json_nodes: The current JSON node(s) to process. + :param indent_level: Current indentation level. + :return: A list of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + + if isinstance(json_nodes, dict): + json_nodes = [json_nodes] + + for node in json_nodes: + node_type = node.get('type') + if not node_type: + logger.warning("Node missing 'type'. Skipping.") + continue # Skip nodes without a type + + logger.debug(f"Handling node of type: {node_type}") + handler_method = getattr(self, f'handle_{node_type}', self.handle_default) + handler_code = handler_method(node, indent_level) + + if isinstance(handler_code, list): + code_lines.extend(handler_code) + elif isinstance(handler_code, str): + code_lines.append(handler_code) + + return code_lines + + def handle_default(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Default handler for unhandled condition types. + + :param node: The node with an unhandled type. + :param indent_level: Current indentation level. + :return: A string representing the default condition. + """ + node_type = node.get('type') + logger.warning(f"Unhandled condition node type: {node_type}. Defaulting to 'False'.") + return 'False' + + def generate_condition_code(self, condition_node: Dict[str, Any], indent_level: int) -> str: + """ + Generates the condition code string based on the condition node. + + :param condition_node: The condition node dictionary. + :param indent_level: Current indentation level. + :return: A string representing the condition in Python syntax. + """ + # Check if the condition_node is a direct value or a nested condition + if not isinstance(condition_node, dict): + # If condition_node is not a dict, return its string representation + return str(condition_node) + node_type = condition_node.get('type') + if not node_type: + return 'False' # Default to False if node type is missing + + # Retrieve the handler method based on node_type + handler_method = getattr(self, f'handle_{node_type}', self.handle_default) + condition_code = handler_method(condition_node, indent_level=indent_level) + return condition_code + + # Add more helper methods here + + # ============================== + # Indicators Handlers + # ============================== + + def handle_indicator(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'indicator' condition type. + + :param node: The indicator node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + indicator_name = node.get('name') + output_field = node.get('output') + + if not indicator_name or not output_field: + logger.error("indicator node missing 'name' or 'output'.") + return 'None' + + # Collect the indicator information + self.indicators_used.append({ + 'name': indicator_name, + 'output': output_field + }) + + # Generate code that calls process_indicator + expr = f"process_indicator('{indicator_name}', '{output_field}')" + logger.debug(f"Generated indicator condition: {expr}") + return expr + + # ============================== + # Balances Handlers + # ============================== + + def handle_starting_balance(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'starting_balance' condition type. + + :param node: The starting_balance node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_starting_balance()" + + def handle_current_balance(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'current_balance' condition type. + + :param node: The current_balance node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_current_balance()" + + def handle_available_balance(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'available_balance' condition type. + + :param node: The available_balance node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_available_balance()" + + def handle_available_strategy_balance(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'available_strategy_balance' condition type. + + :param node: The available_strategy_balance node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_available_strategy_balance()" + + def handle_set_available_strategy_balance(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'set_available_strategy_balance' node type. + + :param node: The set_available_strategy_balance node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + + # Extract the 'BALANCE' input + balance = node.get('inputs', {}).get('BALANCE', 0) + + # If 'balance' is a dict, generate nested code + if isinstance(balance, dict): + balance = self.generate_condition_code(balance, indent_level) + + # Generate the code line to set the available strategy balance + code_lines.append(f"{indent}set_available_strategy_balance({balance})") + + logger.debug(f"Generated set_available_strategy_balance action with balance: {balance}") + return code_lines + + # ============================== + # Order Metrics Handlers + # ============================== + + def handle_order_volume(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'order_volume' condition type. + + :param node: The order_volume node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + order_type = node.get('inputs', {}).get('order_type', 'filled').lower() + if order_type not in ['filled', 'unfilled']: + logger.error(f"Invalid order type '{order_type}' in 'order_volume'. Defaulting to 'filled'.") + order_type = 'filled' + + expr = f"get_order_volume(order_type='{order_type}')" + logger.debug(f"Generated order_volume condition: {expr}") + return expr + + def handle_filled_orders(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'filled_orders' condition type. + + :param node: The filled_orders node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_filled_orders()" + + def handle_unfilled_orders(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'unfilled_orders' condition type. + + :param node: The unfilled_orders node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_unfilled_orders()" + + def handle_order_status(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'order_status' condition type. + + :param node: The order_status node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + order_name = node.get('inputs', {}).get('order_name', 'last_order').strip() + order_status = node.get('inputs', {}).get('status', 'filled').lower() + valid_statuses = ['filled', 'unfilled', 'partial'] + + if not order_name: + logger.warning("Empty 'order_name' in 'order_status'. Using 'last_order'.") + order_name = 'last_order' + + if order_status not in valid_statuses: + logger.error(f"Invalid order status '{order_status}' in 'order_status'. Defaulting to 'filled'.") + order_status = 'filled' + + expr = f"is_order_status(order_name='{order_name}', status='{order_status}')" + logger.debug(f"Generated order_status condition: {expr}") + return expr + + # Add other Order Metrics handlers here... + + # ============================== + # Trade Metrics Handlers + # ============================== + + def handle_active_trades(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'active_trades' condition type. + + :param node: The active_trades node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_active_trades()" + + def handle_total_trades(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'total_trades' condition type. + + :param node: The total_trades node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_total_trades()" + + def handle_last_trade_details(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'last_trade_details' condition type. + + :param node: The last_trade_details node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + details = node.get('inputs', {}).get('details', 'price').lower() + valid_details = ['price', 'volume', 'direction'] + if details not in valid_details: + logger.error(f"Invalid details '{details}' in 'last_trade_details'. Defaulting to 'price'.") + details = 'price' + + expr = f"get_last_trade_details(detail='{details}')" + logger.debug(f"Generated last_trade_details condition: {expr}") + return expr + + def handle_average_entry_price(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'average_entry_price' condition type. + + :param node: The average_entry_price node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_average_entry_price()" + + def handle_unrealized_profit_loss(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'unrealized_profit_loss' condition type. + + :param node: The unrealized_profit_loss node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_unrealized_profit_loss()" + + def handle_user_active_trades(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'user_active_trades' condition type. + + :param node: The user_active_trades node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + return "get_user_active_trades()" + + # Add other Trade Metrics handlers here... + + # ============================== + # Time Metrics Handlers + # ============================== + + def handle_time_since_start(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'time_since_start' condition type. + + :param node: The time_since_start node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + # Extract the 'unit' from inputs + unit = node.get('inputs', {}).get('unit', 'seconds').lower() + valid_units = ['seconds', 'minutes', 'hours'] + + if unit not in valid_units: + logger.error(f"Invalid time unit '{unit}' in 'time_since_start'. Defaulting to 'seconds'.") + unit = 'seconds' + + expr = f"get_time_since_start(unit='{unit}')" + logger.debug(f"Generated time_since_start condition: {expr}") + return expr + + # Add other Time Metrics handlers here... + + # ============================== + # Market Data Handlers + # ============================== + + def handle_current_price(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'current_price' condition type. + + :param node: The current_price node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + symbol = node.get('symbol', self.default_source.get('market', 'BTCUSD')) + return f"get_current_price(symbol='{symbol}')" + + def handle_bid_price(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'bid_price' condition type. + + :param node: The bid_price node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + symbol = node.get('symbol', self.default_source.get('market', 'BTCUSD')) + return f"get_bid_price(symbol='{symbol}')" + + def handle_ask_price(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'ask_price' condition type. + + :param node: The ask_price node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + symbol = node.get('symbol', self.default_source.get('market', 'BTCUSD')) + return f"get_ask_price(symbol='{symbol}')" + + def handle_last_candle_value(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'last_candle_value' condition type. + + :param node: The last_candle_value node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + candle_part = node.get('candle_part', 'close').lower() + valid_candle_parts = ['open', 'high', 'low', 'close'] + if candle_part not in valid_candle_parts: + logger.error(f"Invalid candle_part '{candle_part}' in 'last_candle_value'. Defaulting to 'close'.") + candle_part = 'close' + + # Process source input + source_node = node.get('source', {}) + timeframe = source_node.get('timeframe', self.default_source.get('timeframe', '1m')) + exchange = source_node.get('exchange', self.default_source.get('exchange', 'Binance')) + symbol = source_node.get('symbol', self.default_source.get('market', 'BTCUSD')) + + # Track data sources + self.data_sources_used.add((exchange, symbol, timeframe)) + + expr = f"get_last_candle(candle_part='{candle_part}', timeframe='{timeframe}', exchange='{exchange}', symbol='{symbol}')" + logger.debug(f"Generated last_candle_value condition: {expr}") + return expr + + def handle_source(self, node: Dict[str, Any], indent_level: int) -> Dict[str, Any]: + """ + Handles the 'source' condition type. + + :param node: The source node. + :param indent_level: Current indentation level. + :return: A dictionary representing the source configuration. + """ + timeframe = node.get('time_frame', '5m') + exchange = node.get('exchange', 'Binance') + symbol = node.get('symbol', 'BTCUSD') + + # Track data sources + self.data_sources_used.add((exchange, symbol, timeframe)) + + expr = { + 'timeframe': timeframe, + 'exchange': exchange, + 'symbol': symbol + } + logger.debug(f"Generated source configuration: {expr}") + return expr + + # Add other Market Data handlers here... + + # ============================== + # Logical Handlers + # ============================== + + def handle_comparison(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'comparison' condition type. + + :param node: The comparison node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + operator = node.get('operator') + inputs = node.get('inputs', {}) + left_node = inputs.get('LEFT') + right_node = inputs.get('RIGHT') + + if not operator or not left_node or not right_node: + logger.error("comparison node missing 'operator', 'LEFT', or 'RIGHT'.") + return 'False' + + operator_map = { + '>': '>', + '<': '<', + '>=': '>=', + '<=': '<=', + '==': '==', + '!=': '!=' + } + python_operator = operator_map.get(operator) + if not python_operator: + logger.error(f"Unsupported operator '{operator}'. Defaulting to '=='.") + python_operator = '==' + + left_expr = self.generate_condition_code(left_node, indent_level) + right_expr = self.generate_condition_code(right_node, indent_level) + condition = f"({left_expr} {python_operator} {right_expr})" + logger.debug(f"Generated comparison condition: {condition}") + return condition + + def handle_logical_and(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'logical_and' condition type. + + :param node: The logical_and node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + inputs = node.get('inputs', {}) + left_node = inputs.get('LEFT') + right_node = inputs.get('RIGHT') + + if not left_node or not right_node: + logger.warning("logical_and node missing 'LEFT' or 'RIGHT'. Defaulting to 'False'.") + return 'False' + + left_expr = self.generate_condition_code(left_node, indent_level) + right_expr = self.generate_condition_code(right_node, indent_level) + condition = f"({left_expr} and {right_expr})" + logger.debug(f"Generated logical AND condition: {condition}") + return condition + + def handle_logical_or(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'logical_or' condition type. + + :param node: The logical_or node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + inputs = node.get('inputs', {}) + left_node = inputs.get('LEFT') + right_node = inputs.get('RIGHT') + + if not left_node or not right_node: + logger.warning("logical_or node missing 'LEFT' or 'RIGHT'. Defaulting to 'False'.") + return 'False' + + left_expr = self.generate_condition_code(left_node, indent_level) + right_expr = self.generate_condition_code(right_node, indent_level) + condition = f"({left_expr} or {right_expr})" + logger.debug(f"Generated logical OR condition: {condition}") + return condition + + def handle_is_false(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'is_false' condition type. + + :param node: The is_false node. + :param indent_level: Current indentation level. + :return: A string representing the condition. + """ + condition_node = node.get('condition') + if not condition_node: + logger.error("is_false node missing 'condition'.") + return 'True' # Negation of 'False' is 'True' + + condition_expr = self.generate_condition_code(condition_node, indent_level) + condition = f"not ({condition_expr})" + logger.debug(f"Generated is_false condition: {condition}") + return condition + + # Add other Logical handlers here... + + # ============================== + # Trade Order Handlers + # ============================== + + def handle_trade_action(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'trade_action' node type by passing trade options as arguments to a single trade_order function. + + :param node: The trade_action node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + + # Accessing fields and inputs + trade_type = node.get('trade_type', 'buy').lower() # 'buy' or 'sell' + inputs = node.get('inputs', {}) + + # Initialize defaults + tif = node.get('tif', 'GTC') # Time in Force + + # Set initial default values for optional parameters + price_type = node.get('price_type', 'market') + # price_value is not used in current context; can be removed or utilized as needed + # price_value = node.get('price_value', None) + + size = inputs.get('size', 1) + if isinstance(size, dict): + size = self.generate_condition_code(size, indent_level) # Handle nested json + + # Handle Trade Options if present + trade_options = {} + trade_options_nodes = node.get('trade_options', []) + for option in trade_options_nodes: + option_type = option.get('type') + option_inputs = option.get('inputs', {}) + handler_method = getattr(self, f'handle_{option_type}', self.handle_default_trade_option) + option_value = handler_method(option_inputs, indent_level) + if option_value is not None: + trade_options[option_type] = option_value + + # Collect data sources + source = trade_options.get('source', self.default_source) + exchange = source.get('exchange', 'binance') + symbol = source.get('symbol', 'BTCUSDT') + timeframe = source.get('timeframe', '5m') + self.data_sources_used.add((exchange, symbol, timeframe)) + + # Prepare arguments for trade_order function + trade_order_args = { + 'trade_type': f"'{trade_options.get('trade_type', trade_type)}'", + 'size': size, + 'order_type': f"'{price_type}'", + 'source': self.format_trade_option(source), + 'tif': f"'{trade_options.get('tif', tif)}'", + 'stop_loss': self.format_trade_option(trade_options.get('stop_loss', {})), + 'trailing_stop': self.format_trade_option(trade_options.get('trailing_stop', {})), + 'take_profit': self.format_trade_option(trade_options.get('take_profit', {})), + 'limit': self.format_trade_option(trade_options.get('limit', {})), + 'trailing_limit': self.format_trade_option(trade_options.get('trailing_limit', {})), + 'target_market': self.format_trade_option(trade_options.get('target_market', {})), + 'name_order': self.format_trade_option(trade_options.get('name_order', {})), + } + + # Remove keys with empty dictionaries or None + trade_order_args = {k: v for k, v in trade_order_args.items() if v not in [{}, None]} + + # Convert trade_order_args dictionary to a comma-separated string + args_str = ', '.join([f"{key}={value}" for key, value in trade_order_args.items()]) + + # Generate the trade_order function call + code_lines.append(f"{indent}trade_order({args_str})") + + return code_lines + + def format_trade_option(self, option: Dict[str, Any]) -> str: + """ + Formats the trade option dictionary into a Python dictionary string. + + :param option: The trade option dictionary. + :return: A string representing the trade option or 'None' if empty. + """ + if not option: + return 'None' + + def format_value(value: Any) -> str: + if isinstance(value, str): + return f"'{value}'" + elif isinstance(value, dict): + return self.format_trade_option(value) + elif isinstance(value, (int, float)): + return str(value) + else: + logger.error(f"Unsupported value type: {type(value)}. Using 'None'.") + return 'None' + + items = [f"'{key}': {format_value(value)}" for key, value in option.items()] + return f"{{{', '.join(items)}}}" + + def handle_default_trade_option(self, inputs: Dict[str, Any], indent_level: int) -> Any: + """ + Default handler for unimplemented trade options. + + :param inputs: The inputs for the trade option. + :param indent_level: Current indentation level. + :return: None + """ + logger.warning(f"Unhandled trade option with inputs: {inputs}") + return None + + def handle_tpsl_options(self, option_name: str, inputs: Dict[str, Any], indent_level: int) -> Any: + """ + Generalized handler for trade options like 'stop_loss' and 'take_profit'. + Supports: + - Single value: Standard Stop Loss / Take Profit + - Two values: Stop Loss / Take Profit with a Limit + The values can be int, float, string, dict (node), or a mixed list. + + :param option_name: The name of the trade option ('stop_loss' or 'take_profit'). + :param inputs: The inputs for the trade option. + :param indent_level: Current indentation level. + :return: A dictionary representing the trade option configuration. + """ + option_input = inputs.get(option_name) + if not option_input: + logger.error(f"{option_name} option missing '{option_name}' input.") + return None + + option_config = {} + + def process_value(value): + if isinstance(value, dict): + return self.generate_condition_code(value, indent_level) + elif isinstance(value, (int, float, str)): + return value + else: + logger.error(f"Unsupported type for {option_name}: {type(value)}") + return 'None' + + if isinstance(option_input, list): + if len(option_input) >= 1: + option_config['value'] = process_value(option_input[0]) + if len(option_input) >= 2: + option_config['limit'] = process_value(option_input[1]) + if len(option_input) > 2: + logger.warning(f"{option_name} input list has more than two elements; extras will be ignored.") + else: + option_config['value'] = process_value(option_input) + + logger.debug(f"Generated {option_name} configuration: {option_config}") + return option_config + + def handle_take_profit(self, inputs: Dict[str, Any], indent_level: int) -> Any: + """Handles the 'take_profit' trade option by delegating to the generalized handler.""" + return self.handle_tpsl_options('take_profit', inputs, indent_level) + + def handle_stop_loss(self, inputs: Dict[str, Any], indent_level: int) -> Any: + """Handles the 'stop_loss' trade option by delegating to the generalized handler.""" + return self.handle_tpsl_options('stop_loss', inputs, indent_level) + + def handle_time_in_force(self, inputs: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'time_in_force' trade option by generating the corresponding value. + + :param inputs: The inputs for time_in_force. + :param indent_level: Current indentation level. + :return: A string representing the time_in_force value. + """ + tif = inputs.get('time_in_force', 'gtc').lower() + valid_tif = {'gtc', 'fok', 'ioc'} + if tif not in valid_tif: + logger.warning(f"Invalid Time in Force '{tif}', defaulting to 'gtc'") + tif = 'gtc' + return f"'{tif}'" + + def handle_limit(self, inputs: Dict[str, Any], indent_level: int) -> Any: + """ + Handles the 'limit' trade option by processing the limit input. + + :param inputs: The inputs for limit. + :param indent_level: Current indentation level. + :return: The limit value or None if invalid. + """ + limit = inputs.get('limit') + processed_limit = self.process_numeric_input(limit, 'limit') + if processed_limit is not None: + return {'limit': processed_limit} + return None + + def handle_trailing_stop(self, inputs: Dict[str, Any], indent_level: int) -> Any: + """ + Handles the 'trailing_stop' trade option by processing the trailing distance. + + :param inputs: The inputs for trailing_stop. + :param indent_level: Current indentation level. + :return: A dictionary representing the trailing_stop configuration or None if invalid. + """ + trail_distance = inputs.get('trail_distance') + processed_trail = self.process_numeric_input(trail_distance, 'trailing_stop') + if processed_trail is not None: + return {'trail_distance': processed_trail} + return None + + def handle_trailing_limit(self, inputs: Dict[str, Any], indent_level: int) -> Any: + """ + Handles the 'trailing_limit' trade option by processing the trail limit distance. + + :param inputs: The inputs for trailing_limit. + :param indent_level: Current indentation level. + :return: A dictionary representing the trailing_limit configuration or None if invalid. + """ + trail_limit_distance = inputs.get('trail_limit_distance') + processed_trail_limit = self.process_numeric_input(trail_limit_distance, 'trailing_limit') + if processed_trail_limit is not None: + return {'trail_limit_distance': processed_trail_limit} + return None + + def handle_target_market(self, inputs: Dict[str, Any], indent_level: int) -> Dict[str, Any]: + """ + Handles the 'target_market' trade option by retrieving time frame, exchange, and symbol. + + :param inputs: The inputs for target_market. + :param indent_level: Current indentation level. + :return: A dictionary representing the target market configuration. + """ + time_frame = inputs.get('time_frame', '1m') + exchange = inputs.get('exchange', 'Binance') + symbol = inputs.get('symbol', 'BTCUSDT') + + target_market = { + 'time_frame': time_frame, + 'exchange': exchange, + 'symbol': symbol + } + return target_market + + def handle_name_order(self, inputs: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'name_order' trade option by processing the order name. + + :param inputs: The inputs for name_order. + :param indent_level: Current indentation level. + :return: A string representing the name_order value. + """ + order_name = inputs.get('order_name', 'Unnamed Order').strip() + return f"'{order_name}'" if order_name else "'Unnamed Order'" + + # Add other Trade Order handlers here... + + # ============================== + # Control Handlers + # ============================== + + def handle_pause_strategy(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'pause_strategy' node type. + + :param node: The pause_strategy node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + code_lines.append(f"{indent}pause_strategy()") + code_lines.append(f"{indent}notify_user('Strategy paused.')") + + return code_lines + + def handle_strategy_resume(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'strategy_resume' node type. + + :param node: The strategy_resume node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + code_lines.append(f"{indent}self.resume_strategy()") + code_lines.append(f"{indent}notify_user('Strategy resumed.')") + + return code_lines + + def handle_strategy_exit(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'strategy_exit' node type. + + :param node: The strategy_exit node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + exit_option = node.get('condition', 'all') # 'all', 'in_profit', 'in_loss' + code_lines.append(f"{indent}self.set_exit(True, '{exit_option}') # Initiate exit") + code_lines.append(f"{indent}self.set_paused(True) # Pause the strategy while exiting") + + return code_lines + + def handle_max_drawdown(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'max_drawdown' condition type by generating the corresponding Python code. + + :param node: The max_drawdown node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + + inputs = node.get('inputs', {}) + if not inputs: + logger.error("max_drawdown node missing 'inputs'. Skipping.") + return code_lines # Return empty list + + drawdown = inputs.get('drawdown') + action = inputs.get('action', 'PAUSE') # 'PAUSE' or 'EXIT' + + processed_drawdown = self.process_numeric_input(drawdown, 'drawdown') + if processed_drawdown is None: + logger.error("Invalid or missing 'drawdown' value. Skipping 'max_drawdown' configuration.") + return code_lines # Return empty list + + # Assuming 'set_max_drawdown' is a method in the strategy class + expr = f"set_max_drawdown({processed_drawdown}, '{action}')" + code_lines.append(f"{indent}{expr}") + logger.debug(f"Generated max_drawdown condition: {expr}") + + return code_lines + + def handle_schedule_action(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'schedule_action' node type by generating the corresponding Python code. + + :param node: The schedule_action node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + + # Extract and validate inputs + inputs = node.get('inputs', {}) + time_interval = inputs.get('time_interval', '5m') + repeat = inputs.get('repeat', False) + action = inputs.get('action', []) + + if not isinstance(time_interval, str): + logger.warning(f"Invalid type for time_interval: {type(time_interval)}. Defaulting to '1m'.") + time_interval = '1m' + if not isinstance(repeat, bool): + logger.warning(f"Invalid type for repeat: {type(repeat)}. Defaulting to False.") + repeat = False + + # Generate unique function name and a timestamp variable name to track last execution + self.scheduled_action_count += 1 + function_name = f"scheduled_action_{self.scheduled_action_count}" + last_execution_var = f"last_execution_{self.scheduled_action_count}" + + # Set up the time parsing and interval conversion based on standard time suffixes + time_multipliers = {'m': 60, 'h': 3600, 'd': 86400} + try: + interval_value = int(time_interval[:-1]) + interval_unit = time_interval[-1] + interval_seconds = interval_value * time_multipliers.get(interval_unit, 60) + except ValueError: + logger.error(f"Invalid time interval format: '{time_interval}'. Using default 5 minutes.") + interval_seconds = 5 * 60 + + # Define the action function with time-checking logic + code_lines.append(f"{indent}def {function_name}(current_time):") + code_lines.append(f"{indent} from datetime import timedelta") + code_lines.append(f"{indent} global {last_execution_var}") # Track last execution time globally + code_lines.append(f"{indent} if {last_execution_var} is None:") + code_lines.append( + f"{indent} {last_execution_var} = current_time - timedelta(seconds={interval_seconds})") + + # Check if enough time has passed since the last execution + code_lines.append(f"{indent} time_since_last = (current_time - {last_execution_var}).total_seconds()") + code_lines.append(f"{indent} if time_since_last >= {interval_seconds}:") + code_lines.append(f"{indent} {last_execution_var} = current_time # Update last execution time") + + # Generate action code within the time condition + if not action: + code_lines.append(f"{indent} pass # No actions defined") + else: + action_code = self.generate_code_from_json(action, indent_level + 2) + if not action_code: + code_lines.append(f"{indent} pass # No valid actions defined") + else: + code_lines.extend(action_code) + + # If repeat is False, remove the function from the schedule after first execution + if not repeat: + code_lines.append(f"{indent} self.scheduled_actions.remove({function_name})") + + # Register this function in the schedule + code_lines.append(f"{indent}self.schedule_action({function_name})") + + logger.debug( + f"Generated schedule_action with function '{function_name}', time_interval='{time_interval}', repeat={repeat}") + return code_lines + + def handle_execute_if(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'execute_if' node type by generating an if statement with the given condition and do actions. + + :param node: The execute_if node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + + inputs = node.get('inputs', {}) + condition = inputs.get('CONDITION', {}) + statements = node.get('statements', {}).get('DO', []) + + condition_code = self.generate_condition_code(condition, indent_level) + + code_lines.append(f"{indent}if {condition_code}:") + if not statements: + code_lines.append(f"{indent} pass # No actions defined") + else: + action_code = self.generate_code_from_json(statements, indent_level + 1) + if not action_code: + code_lines.append(f"{indent} pass # No valid actions defined") + else: + code_lines.extend(action_code) + + logger.debug(f"Generated execute_if with condition '{condition_code}'") + return code_lines + + # Add other Control handlers here... + + # ============================== + # Values and Flags Handlers + # ============================== + + def handle_notify_user(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'notify_user' node type. + + :param node: The notify_user node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + indent = ' ' * indent_level + message = node.get('message', 'No message provided.').replace("'", "\\'") + return [f"{indent}notify_user('{message}')"] + + def handle_value_input(self, node: Dict[str, Any], indent_level: int) -> Union[str, List[str]]: + """ + Handles the 'value_input' node type, managing sequential or nested values. + + :param node: The value_input node. + :param indent_level: Current indentation level. + :return: A single string if there is only one value, or a list of strings if multiple values. + """ + values = node.get('values', []) + if not values: + return '' # Return an empty string if no values are present + + # If there's only one value, handle it directly + if len(values) == 1: + value = values[0] + return self.generate_code_from_json(value, indent_level) if isinstance(value, dict) else str(value) + + # For multiple values, create a list of processed values + return [ + self.generate_code_from_json(v, indent_level) if isinstance(v, dict) else str(v) + for v in values + ] + + def handle_get_variable(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'get_variable' node type, managing variable retrieval with chaining. + + :param node: The get_variable node. + :param indent_level: Current indentation level. + :return: A string representing the variable retrieval. + """ + variable_name = node.get('variable_name') + if not variable_name: + logger.error("get_variable node missing 'variable_name'.") + return 'None' + return f"variables.get('{variable_name}', None)" + + def handle_set_variable(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'set_variable' node type, supporting dynamic value assignment. + + :param node: The set_variable node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + variable_name = node.get('variable_name') + value_node = node.get('values') + + if not variable_name: + logger.error("set_variable node missing 'variable_name'. Skipping.") + return [] + + # Process 'values' list as either a single or aggregated expression + if isinstance(value_node, list): + processed_values = [ + self.generate_condition_code(v, indent_level) if isinstance(v, dict) else str(v) + for v in value_node + ] + value_code = processed_values[0] if len(processed_values) == 1 else f"[{', '.join(processed_values)}]" + else: + value_code = self.generate_condition_code(value_node, indent_level) if value_node else '0' + + code_lines.append(f"{indent}variables['{variable_name}'] = {value_code}") + return code_lines + + def handle_set_flag(self, node: Dict[str, Any], indent_level: int) -> List[str]: + """ + Handles the 'set_flag' node type, setting a flag conditionally. + + :param node: The set_flag node. + :param indent_level: Current indentation level. + :return: List of generated code lines. + """ + code_lines = [] + indent = ' ' * indent_level + flag_name = node.get('flag_name') + + if not flag_name: + logger.error("set_flag node missing 'flag_name'. Skipping.") + return [] + + flag_value_input = node.get('flag_value', 'True') + flag_value = 'True' if str(flag_value_input).strip().lower() == 'true' else 'False' + code_lines.append(f"{indent}flags['{flag_name}'] = {flag_value}") + self.flags_used.add(flag_name) + return code_lines + + def handle_flag_is_set(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'flag_is_set' node type, checking if a flag is set. + + :param node: The flag_is_set node. + :param indent_level: Current indentation level. + :return: A string representing the flag condition. + """ + flag_name = node.get('flag_name') + if not flag_name: + logger.error("flag_is_set node missing 'flag_name'.") + return 'False' + return f"flags.get('{flag_name}', False)" + + # Add other Values and Flags handlers here... + + # ============================== + # Risk Management Handlers + # ============================== + + def handle_set_leverage(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'set_leverage' node type. + + :param node: The set_leverage node. + :param indent_level: Current indentation level. + :return: A string representing the leverage setting action. + """ + leverage = node.get('inputs', {}).get('LEVERAGE', 1) + leverage = self.process_numeric_input(leverage, 'leverage', 1, min_value=1, max_value=100) + return f"set_leverage({leverage})" + + def handle_current_margin(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'current_margin' node type. + + :param node: The current_margin node. + :param indent_level: Current indentation level. + :return: A string representing the condition to check current margin. + """ + return "get_current_margin()" + + def handle_risk_ratio(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'risk_ratio' node type. + + :param node: The risk_ratio node. + :param indent_level: Current indentation level. + :return: A string representing the risk ratio condition. + """ + operator = node.get('operator') + value = node.get('value') + if not operator or value is None: + logger.error("risk_ratio node missing 'operator' or 'value'.") + return 'False' + + operator_map = { + '>': '>', + '<': '<', + '>=': '>=', + '<=': '<=', + '==': '==', + '!=': '!=' + } + python_operator = operator_map.get(operator) + if not python_operator: + logger.error(f"Invalid operator for risk_ratio: {operator}.") + return 'False' + + return f"calculate_risk_ratio() {python_operator} {value}" + + def handle_max_position_size(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'max_position_size' node type. + + :param node: The max_position_size node. + :param indent_level: Current indentation level. + :return: A string representing the condition to check the max position size. + """ + max_size = node.get('inputs', {}).get('MAX_SIZE', 1) + max_size = self.process_numeric_input(max_size, 'max_position_size', 1, min_value=1) + return f"get_open_positions_count() < {max_size}" + + # ============================== + # Math Handlers + # ============================== + + import math + import statistics + import random + + def handle_math_operation(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'math_operation' node type. + + :param node: The math_operation node. + :param indent_level: Current indentation level. + :return: A string representing the math operation. + """ + operator = node.get('operator', 'ADD') + left_operand = node.get('left_operand') + right_operand = node.get('right_operand') + + operator_map = { + 'ADD': '+', + 'SUBTRACT': '-', + 'MULTIPLY': '*', + 'DIVIDE': '/' + } + python_operator = operator_map.get(operator, '+') + if operator not in operator_map: + logger.error(f"Invalid operator for math_operation: {operator}. Defaulting to 'ADD'.") + + left_expr = self.process_numeric_list(left_operand, indent_level) + right_expr = self.process_numeric_list(right_operand, indent_level) + expr = f"({left_expr} {python_operator} {right_expr})" + logger.debug(f"Generated math_operation expression: {expr}") + return expr + + def handle_power(self, node: Dict[str, Any], indent_level: int) -> str: + base = self.process_numeric_list(node.get('base', 2), indent_level) + exponent = self.process_numeric_list(node.get('exponent', 3), indent_level) + expr = f"({base} ** {exponent})" + logger.debug(f"Generated power expression: {expr}") + return expr + + def handle_modulo(self, node: Dict[str, Any], indent_level: int) -> str: + dividend = self.process_numeric_list(node.get('dividend', 10), indent_level) + divisor = self.process_numeric_list(node.get('divisor', 3), indent_level) + expr = f"({dividend} % {divisor})" + logger.debug(f"Generated modulo expression: {expr}") + return expr + + def handle_sqrt(self, node: Dict[str, Any], indent_level: int) -> str: + number = self.process_numeric_list(node.get('number', 16), indent_level) + expr = f"math.sqrt({number})" + logger.debug(f"Generated sqrt expression: {expr}") + return expr + + def handle_abs(self, node: Dict[str, Any], indent_level: int) -> str: + number = self.process_numeric_list(node.get('number', -5), indent_level) + expr = f"abs({number})" + logger.debug(f"Generated abs expression: {expr}") + return expr + + def handle_max(self, node: Dict[str, Any], indent_level: int) -> str: + numbers = self.process_numeric_list(node.get('numbers', [1, 2, 3]), indent_level) + expr = f"max({numbers})" + logger.debug(f"Generated max expression: {expr}") + return expr + + def handle_min(self, node: Dict[str, Any], indent_level: int) -> str: + numbers = self.process_numeric_list(node.get('numbers', [1, 2, 3]), indent_level) + expr = f"min({numbers})" + logger.debug(f"Generated min expression: {expr}") + return expr + + def handle_factorial(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'factorial' node type. + + :param node: The factorial node. + :param indent_level: Current indentation level. + :return: A string representing the factorial operation. + """ + number = self.process_numeric_list(node.get('number', 1), indent_level) + expr = f"math.factorial({number})" + logger.debug(f"Generated factorial expression: {expr}") + return expr + + def handle_log(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'log' node type. + + :param node: The log node. + :param indent_level: Current indentation level. + :return: A string representing the logarithm operation. + """ + number = self.process_numeric_list(node.get('number', 1), indent_level) + base = self.process_numeric_list(node.get('base', 10), indent_level) + expr = f"math.log({number}, {base})" + logger.debug(f"Generated log expression: {expr}") + return expr + + def handle_ln(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'ln' node type. + + :param node: The ln node. + :param indent_level: Current indentation level. + :return: A string representing the natural logarithm operation. + """ + number = self.process_numeric_list(node.get('number', 1), indent_level) + expr = f"math.log({number})" + logger.debug(f"Generated ln expression: {expr}") + return expr + + def handle_trig(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'trig' node type. + + :param node: The trig node. + :param indent_level: Current indentation level. + :return: A string representing the trigonometric function. + """ + operator = node.get('operator', 'sin').lower() # 'sin', 'cos', 'tan' + angle = self.process_numeric_list(node.get('angle', 0), indent_level) + + valid_operators = {'sin', 'cos', 'tan'} + if operator not in valid_operators: + logger.error(f"Invalid trig operator '{operator}'. Defaulting to 'sin'.") + operator = 'sin' + + expr = f"math.{operator}({angle})" + logger.debug(f"Generated trig expression: {expr}") + return expr + + def handle_mean(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'mean' node type. + + :param node: The mean node. + :param indent_level: Current indentation level. + :return: A string representing the mean calculation. + """ + numbers = self.process_numeric_list(node.get('numbers', []), indent_level) + if not numbers.strip('[]'): + logger.error("Mean operation received an empty list.") + return "0" # or another default value or error handling + expr = f"statistics.mean({numbers})" + logger.debug(f"Generated mean expression: {expr}") + return expr + + def handle_median(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'median' node type. + + :param node: The median node. + :param indent_level: Current indentation level. + :return: A string representing the median calculation. + """ + numbers = self.process_numeric_list(node.get('numbers', []), indent_level) + if not numbers.strip('[]'): + logger.error("Median operation received an empty list.") + return "0" # or another default value or error handling + expr = f"statistics.median({numbers})" + logger.debug(f"Generated median expression: {expr}") + return expr + + def handle_std_dev(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'std_dev' node type. + + :param node: The std_dev node. + :param indent_level: Current indentation level. + :return: A string representing the standard deviation calculation. + """ + numbers = self.process_numeric_list(node.get('numbers', []), indent_level) + if not numbers.strip('[]'): + logger.error("Standard deviation operation received an empty list.") + return "0" # or another default value or error handling + expr = f"statistics.stdev({numbers})" + logger.debug(f"Generated std_dev expression: {expr}") + return expr + + def handle_round(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'round' node type. + + :param node: The round node. + :param indent_level: Current indentation level. + :return: A string representing the rounding operation. + """ + number = self.process_numeric_list(node.get('number', 0), indent_level) + decimals = self.process_numeric_list(node.get('decimals', 0), indent_level) + expr = f"round({number}, {decimals})" + logger.debug(f"Generated round expression: {expr}") + return expr + + def handle_floor(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'floor' node type. + + :param node: The floor node. + :param indent_level: Current indentation level. + :return: A string representing the floor operation. + """ + number = self.process_numeric_list(node.get('number', 0), indent_level) + expr = f"math.floor({number})" + logger.debug(f"Generated floor expression: {expr}") + return expr + + def handle_ceil(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'ceil' node type. + + :param node: The ceil node. + :param indent_level: Current indentation level. + :return: A string representing the ceiling operation. + """ + number = self.process_numeric_list(node.get('number', 0), indent_level) + expr = f"math.ceil({number})" + logger.debug(f"Generated ceil expression: {expr}") + return expr + + def handle_random(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'random' node type. + + :param node: The random node. + :param indent_level: Current indentation level. + :return: A string representing the random integer generation. + """ + min_val = self.process_numeric_list(node.get('min', 0), indent_level) + max_val = self.process_numeric_list(node.get('max', 1), indent_level) + expr = f"random.randint({min_val}, {max_val})" + logger.debug(f"Generated random expression: {expr}") + return expr + + def handle_clamp(self, node: Dict[str, Any], indent_level: int) -> str: + """ + Handles the 'clamp' node type. + + :param node: The clamp node. + :param indent_level: Current indentation level. + :return: A string representing the clamp operation. + """ + number = self.process_numeric_list(node.get('number', 0), indent_level) + min_val = node.get('min') # Optional + max_val = node.get('max') # Optional + + if min_val is not None: + min_val = self.process_numeric_input(min_val, 'min', default_value=number, min_value=-math.inf) + else: + min_val = 'None' + + if max_val is not None: + max_val = self.process_numeric_input(max_val, 'max', default_value=number, max_value=math.inf) + else: + max_val = 'None' + + # Generate the clamp expression + if min_val != 'None' and max_val != 'None': + expr = f"max({min_val}, min({number}, {max_val}))" + elif min_val != 'None': + expr = f"max({min_val}, {number})" + elif max_val != 'None': + expr = f"min({number}, {max_val})" + else: + expr = str(number) # No clamping needed + + logger.debug(f"Generated clamp expression: {expr}") + return expr + + # add other math handlers here... + + diff --git a/src/Strategies.py b/src/Strategies.py index a680ac0..598fec1 100644 --- a/src/Strategies.py +++ b/src/Strategies.py @@ -8,388 +8,12 @@ import json import uuid import traceback from typing import Any +from PythonGenerator import PythonGenerator +from StrategyInstance import StrategyInstance # Configure logging logger = logging.getLogger(__name__) - -class StrategyInstance: - def __init__(self, strategy_instance_id: str, strategy_id: str, strategy_name: str, - user_id: int, generated_code: str, data_cache: DataCache, indicators: Indicators, trades: Trades): - """ - Initializes a StrategyInstance. - - :param strategy_instance_id: Unique identifier for this strategy execution instance. - :param strategy_id: Identifier of the strategy definition. - :param strategy_name: Name of the strategy. - :param user_id: ID of the user who owns the strategy. - :param generated_code: The generated 'next()' method code. - :param data_cache: Reference to the DataCache instance. - :param indicators: Reference to the Indicators manager. - :param trades: Reference to the Trades manager. - """ - self.strategy_instance_id = strategy_instance_id - self.strategy_id = strategy_id - self.strategy_name = strategy_name - self.user_id = user_id - self.generated_code = generated_code - self.data_cache = data_cache - self.indicators = indicators - self.trades = trades - - # Initialize context variables - self.flags: dict[str, Any] = {} - self.starting_balance = self.trades.get_current_balance(self.user_id) - self.profit_loss: float = 0.0 - self.active: bool = True - self.paused: bool = False - self.exit: bool = False - self.exit_method: str = 'all' - self.start_time = dt.datetime.now() - - def load_context(self): - """ - Loads the strategy execution context from the database. - """ - try: - context_data = self.data_cache.get_rows_from_datacache( - cache_name='strategy_contexts', - filter_vals=[('strategy_instance_id', self.strategy_instance_id)] - ) - if context_data.empty: - logger.warning(f"No context found for StrategyInstance ID: {self.strategy_instance_id}") - return - - context = context_data.iloc[0].to_dict() - self.flags = json.loads(context.get('flags', '{}')) - self.starting_balance = context.get('starting_balance', 0.0) - self.profit_loss = context.get('profit_loss', 0.0) - self.active = context.get('active', True) - self.paused = context.get('paused', False) - self.exit = context.get('exit', False) - self.exit_method = context.get('exit_method', 'all') - - context_start_time = context.get('start_time', None) - if context_start_time: - self.start_time = dt.datetime.fromisoformat(context_start_time) - - except Exception as e: - logger.error(f"Error loading context for StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - - def save_context(self): - """ - Saves the current strategy execution context to the database. - """ - try: - self.data_cache.modify_datacache_item( - cache_name='strategy_contexts', - filter_vals=[('strategy_instance_id', self.strategy_instance_id)], - field_names=('flags', 'profit_loss', 'active', 'paused', 'exit', 'exit_method', 'start_time'), - new_values=( - json.dumps(self.flags), - self.profit_loss, - self.active, - self.paused, - self.exit, - self.exit_method, - self.start_time.isoformat() - ) - ) - except Exception as e: - logger.error(f"Error saving context for StrategyInstance '{self.strategy_instance_id}': {e}") - traceback.print_exc() - - def execute(self) -> dict[str, Any]: - """ - Executes the strategy's 'next()' method. - :return: Result of the execution. - """ - try: - # Define the local execution environment - exec_context = { - 'flags': self.flags, - 'strategy_id': self.strategy_id, - 'user_id': self.user_id, - 'get_last_candle': self.get_last_candle, - 'buy': self.buy_order, - 'sell': self.sell_order, - 'exit_strategy': self.exit_strategy, - 'notify_user': self.notify_user, - 'process_indicator': self.process_indicator, - 'get_strategy_profit_loss': self.get_strategy_profit_loss, - 'is_in_profit': self.is_in_profit, - 'is_in_loss': self.is_in_loss, - 'get_active_trades': self.get_active_trades, - 'get_starting_balance': self.get_starting_balance, - 'set_paused': self.set_paused, - 'set_exit': self.set_exit - } - - # Execute the generated 'next()' method - exec(self.generated_code, {}, exec_context) - - # Call the 'next()' method - if 'next' in exec_context and callable(exec_context['next']): - exec_context['next']() - else: - logger.error( - f"'next' method not defined in generated_code for StrategyInstance '{self.strategy_instance_id}'.") - - # Retrieve and update profit/loss - self.profit_loss = exec_context.get('profit_loss', self.profit_loss) - self.save_context() - - return {"success": True, "profit_loss": self.profit_loss} - - except Exception as e: - logger.error(f"Error executing 'next()' for StrategyInstance '{self.strategy_instance_id}': {e}") - traceback.print_exc() - return {"success": False, "message": str(e)} - - def set_paused(self, value: bool): - """ - Sets the paused state of the strategy. - :param value: True to pause, False to resume. - """ - self.paused = value - self.save_context() - logger.debug(f"Strategy '{self.strategy_id}' paused: {self.paused}") - - def set_exit(self, exit_flag: bool, exit_method: str = 'all'): - """ - Sets the exit state and method of the strategy. - :param exit_flag: True to initiate exit. - :param exit_method: Method to use for exiting ('all', 'in_profit', 'in_loss'). - """ - self.exit = exit_flag - self.exit_method = exit_method - self.save_context() - logger.debug(f"Strategy '{self.strategy_id}' exit set: {self.exit} with method '{self.exit_method}'") - - def get_total_filled_order_volume(self) -> float: - """ - Retrieves the total filled order volume for the strategy. - """ - return self.trades.get_total_filled_order_volume(self.strategy_id) - - def get_total_unfilled_order_volume(self) -> float: - """ - Retrieves the total unfilled order volume for the strategy. - """ - return self.trades.get_total_unfilled_order_volume(self.strategy_id) - - def get_last_candle(self, candle_part: str, timeframe: str, exchange: str, symbol: str): - """ - Retrieves the last candle data based on provided parameters. - - :param candle_part: Part of the candle data (e.g., 'close', 'open'). - :param timeframe: Timeframe of the candle. - :param exchange: Exchange name. - :param symbol: Trading symbol. - :return: Last candle value. - """ - try: - sdt = dt.datetime.now() - dt.timedelta(minutes=int(timeframe[:-1])) - data = self.data_cache.get_records_since(start_datetime=sdt, ex_details=[exchange, symbol, timeframe]) - if not data.empty: - return data.iloc[-1][candle_part] - else: - logger.warning(f"No candle data found for {exchange} {symbol} {timeframe}.") - return None - except Exception as e: - logger.error(f"Error retrieving last candle: {e}", exc_info=True) - traceback.print_exc() - return None - - # Define helper methods - def buy_order(self, size: float, symbol: str, order_type: str = 'market', price: float | None = None, **kwargs): - """ - Executes a buy order. - - :param size: Quantity to buy. - :param symbol: Trading symbol. - :param order_type: Type of order ('market' or 'limit'). - :param price: Price for limit orders. - """ - try: - order_data = { - 'size': size, - 'symbol': symbol, - 'order_type': order_type.lower(), - 'price': price, - **kwargs - } - status, msg = self.trades.buy(order_data, self.user_id) - if status != 'success': - logger.error(f"Buy order failed: {msg}") - self.notify_user(f"Buy order failed: {msg}") - else: - logger.info(f"Buy order executed successfully: {msg}") - except Exception as e: - logger.error(f"Error executing buy order in StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - - def sell_order(self, size: float, symbol: str, order_type: str = 'market', price: float | None = None, **kwargs): - """ - Executes a sell order. - - :param size: Quantity to sell. - :param symbol: Trading symbol. - :param order_type: Type of order ('market' or 'limit'). - :param price: Price for limit orders. - """ - try: - order_data = { - 'size': size, - 'symbol': symbol, - 'order_type': order_type.lower(), - 'price': price, - **kwargs - } - status, msg = self.trades.sell(order_data, self.user_id) - if status != 'success': - logger.error(f"Sell order failed: {msg}") - self.notify_user(f"Sell order failed: {msg}") - else: - logger.info(f"Sell order executed successfully: {msg}") - except Exception as e: - logger.error(f"Error executing sell order in StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - - def exit_strategy(self): - """ - Exits the strategy based on the exit_method. - """ - try: - if self.exit_method == 'all': - self.trades.exit_strategy_all(self.strategy_id) - logger.info(f"Exiting all positions for strategy '{self.strategy_id}'.") - elif self.exit_method == 'in_profit': - self.trades.exit_strategy_in_profit(self.strategy_id) - logger.info(f"Exiting profitable positions for strategy '{self.strategy_id}'.") - elif self.exit_method == 'in_loss': - self.trades.exit_strategy_in_loss(self.strategy_id) - logger.info(f"Exiting losing positions for strategy '{self.strategy_id}'.") - else: - logger.warning( - f"Unknown exit method '{self.exit_method}' for StrategyInstance '{self.strategy_instance_id}'.") - except Exception as e: - logger.error( - f"Error exiting strategy '{self.strategy_id}' in StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - - def notify_user(self, message: str): - """ - Sends a notification to the user. - - :param message: Notification message. - """ - try: - self.trades.notify_user(self.user_id, message) - logger.debug(f"Notification sent to user '{self.user_id}': {message}") - except Exception as e: - logger.error( - f"Error notifying user '{self.user_id}' in StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - - def process_indicator(self, indicator_name: str, output_field: str) -> Any: - """ - Retrieves the latest value of an indicator. - - :param indicator_name: Name of the indicator. - :param output_field: Specific field of the indicator. - :return: Indicator value. - """ - try: - user_indicators = self.indicators.get_indicator_list(user_id=self.user_id) - indicator = user_indicators.get(indicator_name) - if not indicator: - logger.error(f"Indicator '{indicator_name}' not found for user '{self.user_id}'.") - return None - indicator_value = self.indicators.process_indicator(indicator) - value = indicator_value.get(output_field, None) - logger.debug(f"Processed indicator '{indicator_name}' with output field '{output_field}': {value}") - return value - except Exception as e: - logger.error( - f"Error processing indicator '{indicator_name}' in StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - return None - - def get_strategy_profit_loss(self, strategy_id: str) -> float: - """ - Retrieves the current profit or loss of the strategy. - - :param strategy_id: Unique identifier of the strategy. - :return: Profit or loss amount. - """ - try: - profit_loss = self.trades.get_profit(strategy_id) - logger.debug(f"Retrieved profit/loss for strategy '{strategy_id}': {profit_loss}") - return profit_loss - except Exception as e: - logger.error(f"Error retrieving profit/loss for StrategyInstance '{self.strategy_instance_id}': {e}", - exc_info=True) - traceback.print_exc() - return 0.0 - - def is_in_profit(self) -> bool: - """ - Determines if the strategy is currently in profit. - """ - profit = self.profit_loss - logger.debug(f"Checking if in profit: {profit} > 0") - return self.profit_loss > 0 - - def is_in_loss(self) -> bool: - """ - Determines if the strategy is currently in loss. - """ - loss = self.profit_loss - logger.debug(f"Checking if in loss: {loss} < 0") - return self.profit_loss < 0 - - def get_active_trades(self) -> int: - """ - Returns the number of active trades. - """ - active_trades_count = len(self.trades.active_trades) - logger.debug(f"Number of active trades: {active_trades_count}") - return active_trades_count - - def get_starting_balance(self) -> float: - """ - Returns the starting balance. - """ - logger.debug(f"Starting balance: {self.starting_balance}") - return self.starting_balance - - def get_filled_orders(self) -> int: - """ - Retrieves the number of filled orders for the strategy. - """ - return self.trades.get_filled_orders_count(self.strategy_id) - - def get_unfilled_orders(self) -> int: - """ - Retrieves the number of unfilled orders for the strategy. - """ - return self.trades.get_unfilled_orders_count(self.strategy_id) - - def get_available_balance(self) -> float: - """ - Retrieves the available balance for the strategy. - """ - return self.trades.get_available_balance(self.strategy_id) - - class Strategies: def __init__(self, data_cache: DataCache, trades: Trades, indicators: Indicators): """ @@ -397,6 +21,7 @@ class Strategies: :param data_cache: Instance of DataCache to manage cache and database interactions. :param trades: Reference to the trades object that maintains trading actions and data. + :param indicators: Reference to the Indicators manager. """ self.data_cache = data_cache # Database interaction instance self.trades = trades @@ -416,32 +41,47 @@ class Strategies: self.default_exchange = 'Binance' self.default_symbol = 'BTCUSD' - self.flags_used = None - self.data_sources_used = None - self.indicators_used = None - self.active_instances: dict[tuple[int, str], StrategyInstance] = {} # Key: (user_id, strategy_id) - def new_strategy(self, strategy_data: dict, default_source: dict) -> dict: - """ - Add a new strategy to the cache and database. + def _save_strategy(self, strategy_data: dict, default_source: dict) -> dict: + """ + Saves a strategy to the cache and database. Handles both creation and editing. + + :param strategy_data: A dictionary containing strategy data such as name, code, workspace, etc. :param default_source: The default source for undefined sources in the strategy. - :param strategy_data: A dictionary containing strategy data such as name, code, and workspace. :return: A dictionary containing success or failure information. """ + is_edit = 'tbl_key' in strategy_data try: - # Check if a strategy with the same name already exists for this user - filter_conditions = [('creator', strategy_data.get('creator')), ('name', strategy_data['name'])] - existing_strategy = self.data_cache.get_rows_from_datacache( - cache_name='strategies', - filter_vals=filter_conditions - ) - if not existing_strategy.empty: - return {"success": False, "message": "A strategy with this name already exists"} + if is_edit: + # Editing an existing strategy + tbl_key = strategy_data['tbl_key'] + existing_strategy = self.data_cache.get_rows_from_datacache( + cache_name='strategies', + filter_vals=[('tbl_key', tbl_key)] + ) + if existing_strategy.empty: + return {"success": False, "message": "Strategy not found."} + else: + # Creating a new strategy + # Generate a unique identifier first + tbl_key = str(uuid.uuid4()) + + # Check if a strategy with the same name already exists for this user + filter_conditions = [ + ('creator', strategy_data.get('creator')), + ('name', strategy_data['name']) + ] + existing_strategy = self.data_cache.get_rows_from_datacache( + cache_name='strategies', + filter_vals=filter_conditions + ) + if not existing_strategy.empty: + return {"success": False, "message": "A strategy with this name already exists"} # Validate and serialize 'workspace' (XML string) - workspace_data = strategy_data['workspace'] + workspace_data = strategy_data.get('workspace') if not isinstance(workspace_data, str) or not workspace_data.strip(): return {"success": False, "message": "Invalid or empty workspace data"} @@ -452,56 +92,132 @@ class Strategies: except (TypeError, ValueError): return {"success": False, "message": "Invalid stats data format"} + default_source = default_source.copy() + strategy_id = tbl_key + + # Extract and validate 'code' as a dictionary + code = strategy_data.get('code') + if isinstance(code, str): + try: + strategy_json = json.loads(code) + if not isinstance(strategy_json, dict): + return {"success": False, "message": "'code' must be a JSON object."} + except json.JSONDecodeError: + return {"success": False, "message": "Invalid JSON format for 'code'."} + elif isinstance(code, dict): + strategy_json = code + else: + return {"success": False, "message": "'code' must be a JSON string or dictionary."} + + # Initialize PythonGenerator + python_generator = PythonGenerator(default_source, strategy_id) + # Generate strategy components (code, indicators, data_sources, flags) - strategy_components = self.generate_strategy_code(strategy_data['code'], default_source) + strategy_components = python_generator.generate(strategy_json) # Add the combined strategy components to the data to be stored strategy_data['strategy_components'] = json.dumps(strategy_components) - # Generate a unique identifier - tbl_key = str(uuid.uuid4()) - - # Insert the strategy into the database and cache - self.data_cache.insert_row_into_datacache( - cache_name='strategies', - columns=("creator", "name", "workspace", "code", "stats", - "public", "fee", 'tbl_key', 'strategy_components'), - values=( + if is_edit: + # Editing existing strategy + tbl_key = strategy_data['tbl_key'] + # Prepare the columns and values for the update + columns = ( + "creator", "name", "workspace", "code", "stats", "public", "fee", "tbl_key", "strategy_components" + ) + values = ( strategy_data.get('creator'), strategy_data['name'], - strategy_data['workspace'], + workspace_data, # Use the validated workspace data strategy_data['code'], - stats_serialized, + stats_serialized, # Serialized stats bool(strategy_data.get('public', 0)), float(strategy_data.get('fee', 0.0)), tbl_key, - strategy_data['strategy_components'] + strategy_data['strategy_components'] # Serialized strategy components ) - ) - # Construct the saved strategy data to return - saved_strategy = { - "id": tbl_key, # Assuming tbl_key is used as a unique identifier - "creator": strategy_data.get('creator'), - "name": strategy_data['name'], - "workspace": workspace_data, # Original workspace data - "code": strategy_data['code'], - "stats": stats_data, - "public": bool(strategy_data.get('public', 0)), - "fee": float(strategy_data.get('fee', 0.0)) - } - # If everything is successful, return a success message along with the saved strategy data - return { - "success": True, - "message": "Strategy created and saved successfully", - "strategy": saved_strategy # Include the strategy data - } + # Update the strategy in the database and cache + self.data_cache.modify_datacache_item( + cache_name='strategies', + filter_vals=[('tbl_key', tbl_key)], + field_names=columns, + new_values=values, + key=tbl_key, + overwrite='tbl_key' # Use 'tbl_key' to identify the entry to overwrite + ) + + # Return success message + return {"success": True, "message": "Strategy updated successfully"} + + else: + # Creating new strategy + # Insert the strategy into the database and cache + self.data_cache.insert_row_into_datacache( + cache_name='strategies', + columns=( + "creator", "name", "workspace", "code", "stats", + "public", "fee", 'tbl_key', 'strategy_components' + ), + values=( + strategy_data.get('creator'), + strategy_data['name'], + strategy_data['workspace'], + strategy_data['code'], + stats_serialized, + bool(strategy_data.get('public', 0)), + float(strategy_data.get('fee', 0.0)), + tbl_key, + strategy_data['strategy_components'] + ) + ) + + # Construct the saved strategy data to return + saved_strategy = { + "id": tbl_key, # Assuming tbl_key is used as a unique identifier + "creator": strategy_data.get('creator'), + "name": strategy_data['name'], + "workspace": workspace_data, # Original workspace data + "code": strategy_data['code'], + "stats": stats_data, + "public": bool(strategy_data.get('public', 0)), + "fee": float(strategy_data.get('fee', 0.0)) + } + # If everything is successful, return a success message along with the saved strategy data + return { + "success": True, + "message": "Strategy created and saved successfully", + "strategy": saved_strategy # Include the strategy data + } except Exception as e: # Catch any exceptions and return a failure message - # Consider logging the exception with traceback for debugging + # Log the exception with traceback for debugging + logger.error(f"Failed to save strategy: {e}", exc_info=True) traceback.print_exc() - return {"success": False, "message": f"Failed to create strategy: {str(e)}"} + operation = "update" if is_edit else "create" + return {"success": False, "message": f"Failed to {operation} strategy: {str(e)}"} + + + def new_strategy(self, strategy_data: dict, default_source: dict) -> dict: + """ + Add a new strategy to the cache and database. + + :param default_source: The default source for undefined sources in the strategy. + :param strategy_data: A dictionary containing strategy data such as name, code, and workspace. + :return: A dictionary containing success or failure information. + """ + return self._save_strategy(strategy_data, default_source) + + def edit_strategy(self, strategy_data: dict, default_source: dict) -> dict: + """ + Updates an existing strategy in the cache and database. + + :param default_source: The default source for undefined sources in the strategy. + :param strategy_data: A dictionary containing the updated strategy data. + :return: A dictionary containing success or failure information. + """ + return self._save_strategy(strategy_data, default_source) def delete_strategy(self, user_id: int, name: str) -> dict: try: @@ -514,78 +230,6 @@ class Strategies: logger.error(f"Failed to delete strategy '{name}' for user '{user_id}': {e}", exc_info=True) return {"success": False, "message": f"Failed to delete strategy: {str(e)}"} - def edit_strategy(self, strategy_data: dict, default_source: dict) -> dict: - """ - Updates an existing strategy in the cache and database. - - :param default_source: The default source for undefined sources in the strategy. - :param strategy_data: A dictionary containing the updated strategy data. - :return: A dictionary containing success or failure information. - """ - if 'tbl_key' not in strategy_data: - return {"success": False, "message": "Missing 'tbl_key' in strategy data."} - - try: - tbl_key = strategy_data['tbl_key'] # The unique identifier for the strategy - - existing_strategy = self.data_cache.get_rows_from_datacache( - cache_name='strategies', - filter_vals=[('tbl_key', tbl_key)] - ) - if existing_strategy.empty: - return {"success": False, "message": "Strategy not found."} - - # Validate and serialize 'workspace' (XML string) - workspace_data = strategy_data['workspace'] - if not isinstance(workspace_data, str) or not workspace_data.strip(): - return {"success": False, "message": "Invalid or empty workspace data"} - - # Serialize 'stats' field - try: - stats_data = strategy_data.get('stats', {}) - stats_serialized = json.dumps(stats_data) - except (TypeError, ValueError): - return {"success": False, "message": "Invalid stats data format"} - - # Generate updated strategy components (code, indicators, data_sources, flags) - strategy_components = self.generate_strategy_code(strategy_data['code'], default_source) - - # Add the combined strategy components to the data to be stored - strategy_data['strategy_components'] = json.dumps(strategy_components) - - # Prepare the columns and values for the update - columns = ( - "creator", "name", "workspace", "code", "stats", "public", "fee", "tbl_key", "strategy_components") - values = ( - strategy_data.get('creator'), - strategy_data['name'], - workspace_data, # Use the validated workspace data - strategy_data['code'], - stats_serialized, # Serialized stats - bool(strategy_data.get('public', 0)), - float(strategy_data.get('fee', 0.0)), - tbl_key, - strategy_data['strategy_components'] # Serialized strategy components - ) - - # Update the strategy in the database and cache - self.data_cache.modify_datacache_item( - cache_name='strategies', - filter_vals=[('tbl_key', tbl_key)], - field_names=columns, - new_values=values, - key=tbl_key, - overwrite='tbl_key' # Use 'tbl_key' to identify the entry to overwrite - ) - - # Return success message - return {"success": True, "message": "Strategy updated successfully"} - - except Exception as e: - # Handle exceptions and return failure message - traceback.print_exc() - return {"success": False, "message": f"Failed to update strategy: {str(e)}"} - def get_all_strategy_names(self, user_id: int) -> list | None: """ Return a list of all strategy names stored in the cache or database. @@ -668,733 +312,6 @@ class Strategies: return strategy_row - def generate_strategy_code(self, json_code: Any, default_source: dict[str, Any]) -> dict[str, Any]: - """ - Generates the code for the 'next' method and collects indicators, data sources, and flags. - - :param default_source: The default source for undefined sources in the strategy. - :param json_code: JSON representation of the strategy logic. - :return: A dictionary containing 'generated_code', 'indicators', 'data_sources', and 'flags_used'. - """ - if isinstance(json_code, str): - json_code = json.loads(json_code) - - # Initialize code components - code_lines = [] - indent_level = 1 # For 'next' method code indentation - indent = ' ' * indent_level - - # Initialize lists to collect indicators, data sources, and flags - self.indicators_used: list[dict[str, Any]] = [] - self.data_sources_used: set = set() - self.flags_used: set = set() - - # Generate code based on the JSON structure - code_lines.append(f"def next():") - indent_level += 1 # Increase indent level inside the 'next' method - indent = ' ' * indent_level - - # Generate code from JSON nodes - code_lines.extend(self.generate_code_from_json(json_code, default_source, indent_level)) - - # Handle unpause logic - # Extract and append resume conditions after action blocks - resume_conditions = self.extract_resume_conditions(json_code) - for condition in resume_conditions: - condition_code = self.generate_condition_code(condition) - code_lines.append(f" if {condition_code}:") - action_indent = ' ' * (indent_level + 1) - code_lines.append(f"{action_indent}self.set_paused(False) # Resume the strategy") - code_lines.append(f"{action_indent}self.notify_user('Strategy resumed.')") - - # Handle exit logic at the end of 'next()' - code_lines.append(f" if self.exit:") - indent_level += 1 - exit_indent = ' ' * indent_level - code_lines.append(f"{exit_indent}self.exit_strategy()") - code_lines.append(f"{exit_indent}self.paused = True # Pause the strategy while exiting.") - indent_level -= 1 - - # Join the code lines into a single string - next_method_code = '\n'.join(code_lines) - - # Prepare the combined dictionary - strategy_components = { - 'generated_code': next_method_code, - 'indicators': self.indicators_used, - 'data_sources': list(self.data_sources_used), - 'flags_used': list(self.flags_used) - } - - logger.debug(f"Generated strategy code for strategy.") - return strategy_components - - def generate_code_from_json(self, json_nodes: Any, default_source: dict, indent_level: int): - """ - Recursively generates code lines from JSON nodes. - """ - code_lines = [] - indent = ' ' * indent_level - - if isinstance(json_nodes, dict): - json_nodes = [json_nodes] - - for node in json_nodes: - node_type = node.get('type') - if not node_type: - continue # Skip nodes without a type - - if node_type == 'strategy': - # Process the 'strategy' node - statements = node.get('statements', []) - if statements: - code_lines.extend(self.generate_code_from_json(statements, default_source, indent_level)) - elif node_type == 'execute_if': - # Handle 'execute_if' node - code_lines.extend(self.handle_execute_if(node, default_source, indent_level)) - elif node_type == 'trade_action': - code_lines.extend(self.handle_trade_action(node, default_source, indent_level)) - elif node_type == 'set_flag': - code_lines.extend(self.handle_set_flag(node, indent_level)) - elif node_type == 'notify_user': - code_lines.extend(self.handle_notify_user(node, indent_level)) - elif node_type == 'set_variable': - code_lines.extend(self.handle_set_variable(node, indent_level)) - elif node_type in ['comparison', 'logical_and', 'logical_or', 'is_false', 'strategy_profit_loss', - 'active_trades', 'filled_orders', 'unfilled_orders', - 'total_filled_order_volume', 'total_unfilled_order_volume', - 'available_balance']: - # Handle conditions - condition_code = self.generate_condition_code(node) - code_lines.append(f"{indent}if {condition_code}:") - # Assuming 'DO' is the key for actions under the condition - actions = node.get('statements', {}).get('DO', []) - code_lines.extend(self.generate_code_from_json(actions, default_source, indent_level + 1)) - elif node_type == 'strategy_pause': - code_lines.extend(self.handle_strategy_pause(node, indent_level)) - elif node_type == 'strategy_exit': - code_lines.extend(self.handle_strategy_exit(node, indent_level)) - elif node_type == 'strategy_resume': - # Do not handle here; handled separately - pass - else: - # Handle other node types as needed - logger.warning(f"Unhandled node type: {node_type}") - pass - - return code_lines - - def generate_condition_code(self, condition_node: dict) -> str: - """ - Generates the condition code string based on the condition node. - - :param condition_node: The condition node. - :return: A string representing the condition. - """ - if not isinstance(condition_node, dict): - # If condition_node is not a dict, return its string representation - return str(condition_node) - node_type = condition_node.get('type') - if not node_type: - return 'False' # Default to False if node type is missing - - # Handling different condition types - if node_type == 'comparison': - operator = condition_node.get('operator') - inputs = condition_node.get('inputs', {}) - left_node = inputs.get('LEFT') - right_node = inputs.get('RIGHT') - left_expr = self.generate_condition_code(left_node) - right_expr = self.generate_condition_code(right_node) - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - return f"({left_expr} {python_operator} {right_expr})" - - elif node_type == 'dynamic_value': - values = condition_node.get('values', []) - if len(values) == 1: - value = values[0] - if isinstance(value, dict): - return self.generate_condition_code(value) - else: - # If value is not a dict, return its string representation - return str(value) - else: - # Handle lists of values if necessary - return '0' - - elif node_type == 'logical_and': - conditions = condition_node.get('conditions', []) - condition_exprs = [self.generate_condition_code(cond) for cond in conditions] - return ' and '.join(condition_exprs) - - elif node_type == 'logical_or': - conditions = condition_node.get('conditions', []) - condition_exprs = [self.generate_condition_code(cond) for cond in conditions] - return ' or '.join(condition_exprs) - - # Return True if a condition is False - elif node_type == 'is_false': - condition = condition_node.get('condition') - condition_expr = self.generate_condition_code(condition) - return f"not ({condition_expr})" - - elif node_type == 'time_since_start': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') # Numeric value - unit = condition_node.get('unit') # 'minutes', 'hours', 'days' - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - # Convert value to timedelta - if unit == 'minutes': - delta = f"dt.timedelta(minutes={value})" - elif unit == 'hours': - delta = f"dt.timedelta(hours={value})" - elif unit == 'days': - delta = f"dt.timedelta(days={value})" - else: - logger.warning(f"Unknown unit '{unit}' in time_since_start condition.") - delta = "dt.timedelta(minutes=0)" - - expr = f"(dt.datetime.now() - self.start_time) {python_operator} {delta}" - - return f"({expr})" - elif node_type == 'order_volume': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') # Numeric value - order_type = condition_node.get('order_type') # 'filled' or 'unfilled' - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - if order_type == 'filled': - expr = f"get_total_filled_order_volume() {python_operator} {value}" - elif order_type == 'unfilled': - expr = f"get_total_unfilled_order_volume() {python_operator} {value}" - else: - logger.warning(f"Unknown order_type '{order_type}' in order_volume condition.") - expr = 'False' - - return f"({expr})" - - # Return True if a flag is set. - elif node_type == 'flag_is_set': - flag_name = condition_node.get('flag_name') - self.flags_used.add(flag_name) - return f"flags.get('{flag_name}', False)" - - elif node_type == 'strategy_profit_loss': - metric = condition_node.get('metric') # 'profit' or 'loss' - operator = condition_node.get('operator') # '>', '<', '>=', '<=', '==', '!=' - value = condition_node.get('value') # Numeric value to compare against - - # Map operator to Python operator - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - # Determine the method to call based on the metric - if metric == 'profit': - expr = f"get_strategy_profit_loss('{self.strategy_id}') {python_operator} {value}" - elif metric == 'loss': - expr = f"get_strategy_profit_loss('{self.strategy_id}') {python_operator} -{value}" - else: - logger.warning(f"Unknown metric '{metric}' in strategy_profit_loss condition.") - expr = 'False' - - return f"({expr})" - - # Return the number of active trades in the strategy. - elif node_type == 'active_trades': - operator = condition_node.get('operator') # '>', '<', '>=', '<=', '==', '!=' - value = condition_node.get('value') # Numeric value to compare against - - # Map operator to Python operator - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - expr = f"get_active_trades() {python_operator} {value}" - - return f"({expr})" - - elif node_type == 'total_filled_order_volume': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - expr = f"get_total_filled_order_volume() {python_operator} {value}" - - return f"({expr})" - - elif node_type == 'max_drawdown': - value = condition_node.get('value') # Maximum drawdown value - action = condition_node.get('action') # 'pause' or 'exit' - - if action == 'pause': - expr = f"get_strategy_drawdown('{self.strategy_id}') >= {value}" - # Action: Pause strategy - code = f"if ({expr}):\n self.set_paused(True)\n self.notify_user('Strategy paused due to max drawdown.')" - elif action == 'exit': - expr = f"get_strategy_drawdown('{self.strategy_id}') >= {value}" - # Action: Exit strategy - code = f"if ({expr}):\n self.set_exit(True, 'all')\n self.set_paused(True)\n self.notify_user('Strategy exited due to max drawdown.')" - else: - logger.warning(f"Unknown action '{action}' in max_drawdown condition.") - code = "" - - return code - - elif node_type == 'total_unfilled_order_volume': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - expr = f"get_total_unfilled_order_volume() {python_operator} {value}" - - return f"({expr})" - - elif node_type == 'filled_orders': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - expr = f"get_filled_orders() {python_operator} {value}" - - return f"({expr})" - - elif node_type == 'unfilled_orders': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - expr = f"get_unfilled_orders() {python_operator} {value}" - - return f"({expr})" - - # Return the current balance. - elif node_type == 'current_balance': - return 'get_current_balance()' - - # Return the starting balance. - elif node_type == 'starting_balance': - return 'get_starting_balance()' - - elif node_type == 'available_balance': - operator = condition_node.get('operator') # '>', '<', etc. - value = condition_node.get('value') - - operator_map = { - '>': '>', - '<': '<', - '>=': '>=', - '<=': '<=', - '==': '==', - '!=': '!=' - } - python_operator = operator_map.get(operator, operator) - - expr = f"get_available_balance() {python_operator} {value}" - - return f"({expr})" - - # Return the result of a math operation. - elif node_type == 'math_operation': - operator = condition_node.get('operator') - left_operand = condition_node.get('left_operand') - right_operand = condition_node.get('right_operand') - left_expr = self.generate_condition_code(left_operand) - right_expr = self.generate_condition_code(right_operand) - operator_map = { - 'ADD': '+', - 'SUBTRACT': '-', - 'MULTIPLY': '*', - 'DIVIDE': '/' - } - python_operator = operator_map.get(operator, operator) - return f"({left_expr} {python_operator} {right_expr})" - - # Return the requested value from the last received candle. - elif node_type == 'last_candle_value': - candle_part = condition_node.get('candle_part') - source_node = condition_node.get('source', {}) - timeframe = source_node.get('timeframe', 'default') - exchange = source_node.get('exchange', 'default') - symbol = source_node.get('symbol', 'default') - self.data_sources_used.add((exchange, symbol, timeframe)) - return f"get_last_candle(candle_part='{candle_part}', timeframe='{timeframe}', exchange='{exchange}', symbol='{symbol}')" - - # Return the requested value from an indicator. - elif node_type == 'indicator': - indicator_name = condition_node.get('name') - output_field = condition_node.get('output') - # Collect the indicator information - self.indicators_used.append({ - 'name': indicator_name, - 'output': output_field - }) - # Generate code that calls process_indicator - return f"process_indicator('{indicator_name}', '{output_field}')" - - # Return the inputted value. - elif node_type == 'value_input': - values = condition_node.get('values', []) - if len(values) == 1: - return str(values[0]) - else: - return '0' - - # Return a number - elif node_type == 'number': - # Handle numeric values - return str(condition_node.get('value', 0)) - - # Return a string - elif node_type == 'string': - # Handle string values - return f"'{condition_node.get('value', '')}'" - - else: - logger.warning(f"Unhandled condition node type: {node_type}") - return 'False' # Default to False for unhandled types - - def handle_execute_if(self, node: dict, default_source: dict, indent_level: int) -> list[str]: - code_lines = [] - indent = ' ' * indent_level - - condition_node = node.get('inputs', {}).get('CONDITION') - if not condition_node: - logger.error("Condition missing in 'execute_if' node.") - return code_lines - - condition_code = self.generate_condition_code(condition_node) - code_lines.append(f"{indent}if {condition_code}:") - # Process the 'DO' statements under this condition - do_statements = node.get('statements', {}).get('DO', []) - code_lines.extend(self.generate_code_from_json(do_statements, default_source, indent_level + 1)) - - return code_lines - - def handle_strategy_pause(self, node: dict, indent_level: int) -> list[str]: - """ - Handles the 'strategy_pause' node type. - - :param node: The strategy node. - :param indent_level: Current indentation level. - :return: List of generated code lines. - """ - code_lines = [] - indent = ' ' * indent_level - - condition_node = node.get('condition') - if not condition_node: - raise ValueError("Condition is missing in 'strategy_pause' node.") - - condition_code = self.generate_condition_code(condition_node) - code_lines.append(f"{indent}if {condition_code}:") - action_indent = ' ' * (indent_level + 1) - code_lines.append(f"{action_indent}self.set_paused(True) # Pause the strategy") - code_lines.append(f"{action_indent}notify_user('Strategy paused.')") - - return code_lines - - def extract_resume_conditions(self, json_nodes: Any) -> list[dict[str, Any]]: - """ - Extracts conditions intended to resume the strategy from the JSON nodes. - - :param json_nodes: The current JSON node(s) to process. - :return: A list of condition nodes. - """ - resume_conditions = [] - if isinstance(json_nodes, dict): - json_nodes = [json_nodes] - - for node in json_nodes: - node_type = node.get('type') - if node_type == 'strategy_resume': - condition_node = node.get('condition') - if condition_node: - resume_conditions.append(condition_node) - elif node_type == 'strategy_exit' or node_type == 'strategy_pause': - # These nodes might contain nested statements - nested_actions = node.get('statements', {}).get('DO', []) - resume_conditions.extend(self.extract_resume_conditions(nested_actions)) - elif 'statements' in node: - statements = node['statements'] - if isinstance(statements, dict): - do_statements = statements.get('DO', []) - resume_conditions.extend(self.extract_resume_conditions(do_statements)) - elif isinstance(statements, list): - for sub_node in statements: - resume_conditions.extend(self.extract_resume_conditions(sub_node)) - else: - # Handle other possible types if necessary - pass - return resume_conditions - - def handle_strategy_exit(self, node: dict, indent_level: int) -> list[str]: - """ - Handles the 'strategy_exit' node type. - - :param node: The strategy node. - :param indent_level: Current indentation level. - :return: List of generated code lines. - """ - code_lines = [] - indent = ' ' * indent_level - - condition_node = node.get('condition') - exit_option = node.get('exit_option', 'all') # 'all', 'in_profit', 'in_loss' - - if not condition_node: - raise ValueError("Condition is missing in 'strategy_exit' node.") - - condition_code = self.generate_condition_code(condition_node) - code_lines.append(f"{indent}if {condition_code}:") - action_indent = ' ' * (indent_level + 1) - code_lines.append(f"{action_indent}self.set_exit(True, '{exit_option}') # Initiate exit") - code_lines.append(f"{action_indent}self.set_paused(True) # Pause the strategy while exiting") - - return code_lines - - def handle_trade_action(self, node: dict, default_source: dict[str, Any], indent_level: int) -> list[str]: - """ - Handles the 'trade_action' node type. - - :param node: The trade action node. - :param default_source: The default source for undefined sources in the strategy. - :param indent_level: Current indentation level. - :return: List of generated code lines. - """ - code_lines = [] - indent = ' ' * indent_level - - # Accessing fields and inputs - trade_type = node.get('trade_type', 'buy').lower() # 'buy' or 'sell' - condition_node = node.get('condition') - size = node.get('size', 1) - - # Initialize defaults - tif = node.get('tif', 'GTC') # Time in Force - - default_timeframe = default_source.get('timeframe', self.default_timeframe) - default_exchange = default_source.get('exchange', self.default_exchange) - default_symbol = default_source.get('market', self.default_symbol) - - timeframe = node.get('timeframe', default_timeframe) - exchange = node.get('exchange', default_exchange) - symbol = node.get('symbol', default_symbol) - - # Set initial default values for optional parameters - price_type = node.get('price_type', 'market') - price_value = node.get('price_value', None) - - stop_loss_type = node.get('stop_loss_type', None) - stop_loss_value = node.get('stop_loss', None) - stop_loss_trigger_price = node.get('stop_loss_trigger_price', None) - stop_loss_limit_price = node.get('stop_loss_limit_price', None) - - take_profit_type = node.get('take_profit_type', None) - take_profit_value = node.get('take_profit', None) - take_profit_trigger_price = node.get('take_profit_trigger_price', None) - take_profit_limit_price = node.get('take_profit_limit_price', None) - - # Collect data sources - self.data_sources_used.add((exchange, symbol, timeframe)) - - # Generate condition code - if condition_node: - condition_code = self.generate_condition_code(condition_node) - code_lines.append(f"{indent}if {condition_code}:") - action_indent = ' ' * (indent_level + 1) - else: - action_indent = indent - - # Handle 'size' being a single value or a list - sizes = size if isinstance(size, list) else [size] - - for s in sizes: - order_params = [ - f"size={s}", - f"symbol='{symbol}'", - f"order_type='{price_type}'", - f"tif='{tif}'", - f"timeframe='{timeframe}'", - f"exchange='{exchange}'" - ] - - # Handle Price - if price_type == 'limit' and price_value is not None: - order_params.append(f"price={price_value}") - else: - order_params.append(f"price=None") - - # Handle Stop Loss - if stop_loss_type == 'market' and stop_loss_value is not None: - order_params.append(f"stop_loss_type='{stop_loss_type}'") - order_params.append(f"stop_loss={stop_loss_value}") - elif stop_loss_type == 'limit': - if stop_loss_trigger_price and stop_loss_limit_price: - order_params.append(f"stop_loss_type='{stop_loss_type}'") - order_params.append(f"stop_loss_trigger_price={stop_loss_trigger_price}") - order_params.append(f"stop_loss_limit_price={stop_loss_limit_price}") - - # Handle Take Profit - if take_profit_type == 'market' and take_profit_value is not None: - order_params.append(f"take_profit_type='{take_profit_type}'") - order_params.append(f"take_profit={take_profit_value}") - elif take_profit_type == 'limit': - if take_profit_trigger_price and take_profit_limit_price: - order_params.append(f"take_profit_type='{take_profit_type}'") - order_params.append(f"take_profit_trigger_price={take_profit_trigger_price}") - order_params.append(f"take_profit_limit_price={take_profit_limit_price}") - - # Handle Price parameters if applicable - if price_type and price_value is not None: - order_params.append(f"price_type='{price_type}'") - order_params.append(f"price_value={price_value}") - - # Convert parameters list to a comma-separated string - params_str = ', '.join(order_params) - - # Generate the trade execution line - code_lines.append(f"{action_indent}{trade_type}({params_str})") - - return code_lines - - def handle_set_flag(self, node: dict, indent_level: int) -> list[str]: - """ - Handles the 'set_flag' node type. - - :param node: The set flag node. - :param indent_level: Current indentation level. - :return: List of generated code lines. - """ - code_lines = [] - indent = ' ' * indent_level - - # Retrieve the condition directly from the node - condition_node = node.get('condition') - if condition_node is None: - raise ValueError("Condition is missing in 'set_flag' node.") - - flag_name = node.get('flag_name') - flag_value = node.get('flag_value', 'True') # Default to 'True' if not provided - - condition_code = self.generate_condition_code(condition_node) - code_lines.append(f"{indent}if {condition_code}:") - action_indent = ' ' * (indent_level + 1) - code_lines.append(f"{action_indent}flags['{flag_name}'] = {flag_value}") - - return code_lines - - def handle_notify_user(self, node: dict, indent_level: int) -> list[str]: - """ - Handles the 'notify_user' node type. - - :param node: The notify user node. - :param indent_level: Current indentation level. - :return: List of generated code lines. - """ - code_lines = [] - indent = ' ' * indent_level - - message = node.get('message', 'No message provided.') - # Ensure that the message is properly escaped if necessary - code_lines.append(f"{indent}notify_user('{message}')") - - return code_lines - - def handle_set_variable(self, node: dict, indent_level: int) -> list[str]: - """ - Handles the 'set_variable' node type. - - :param node: The set variable node. - :param indent_level: Current indentation level. - :return: List of generated code lines. - """ - code_lines = [] - indent = ' ' * indent_level - - variable_name = node.get('variable_name') - value_node = node.get('value') - value_code = self.generate_condition_code(value_node) if value_node else '0' - - code_lines.append(f"{indent}{variable_name} = {value_code}") - - return code_lines - def update_strategy_stats(self, strategy_id: str, profit_loss: float) -> None: """ Updates the strategy's statistics based on the latest profit or loss. diff --git a/src/StrategyInstance.py b/src/StrategyInstance.py new file mode 100644 index 0000000..cbe022c --- /dev/null +++ b/src/StrategyInstance.py @@ -0,0 +1,411 @@ +import logging +from DataCache_v3 import DataCache +from indicators import Indicators +from trade import Trades +import datetime as dt +import json +import traceback +from typing import Any + +# Configure logging +logger = logging.getLogger(__name__) + +class StrategyInstance: + def __init__(self, strategy_instance_id: str, strategy_id: str, strategy_name: str, + user_id: int, generated_code: str, data_cache: DataCache, indicators: Indicators, trades: Trades): + """ + Initializes a StrategyInstance. + + :param strategy_instance_id: Unique identifier for this strategy execution instance. + :param strategy_id: Identifier of the strategy definition. + :param strategy_name: Name of the strategy. + :param user_id: ID of the user who owns the strategy. + :param generated_code: The generated 'next()' method code. + :param data_cache: Reference to the DataCache instance. + :param indicators: Reference to the Indicators manager. + :param trades: Reference to the Trades manager. + """ + self.strategy_instance_id = strategy_instance_id + self.strategy_id = strategy_id + self.strategy_name = strategy_name + self.user_id = user_id + self.generated_code = generated_code + self.data_cache = data_cache + self.indicators = indicators + self.trades = trades + + # Initialize context variables + self.flags: dict[str, Any] = {} + self.starting_balance = self.trades.get_current_balance(self.user_id) + self.profit_loss: float = 0.0 + self.active: bool = True + self.paused: bool = False + self.exit: bool = False + self.exit_method: str = 'all' + self.start_time = dt.datetime.now() + + def load_context(self): + """ + Loads the strategy execution context from the database. + """ + try: + context_data = self.data_cache.get_rows_from_datacache( + cache_name='strategy_contexts', + filter_vals=[('strategy_instance_id', self.strategy_instance_id)] + ) + if context_data.empty: + logger.warning(f"No context found for StrategyInstance ID: {self.strategy_instance_id}") + return + + context = context_data.iloc[0].to_dict() + self.flags = json.loads(context.get('flags', '{}')) + self.starting_balance = context.get('starting_balance', 0.0) + self.profit_loss = context.get('profit_loss', 0.0) + self.active = context.get('active', True) + self.paused = context.get('paused', False) + self.exit = context.get('exit', False) + self.exit_method = context.get('exit_method', 'all') + + context_start_time = context.get('start_time', None) + if context_start_time: + self.start_time = dt.datetime.fromisoformat(context_start_time) + + except Exception as e: + logger.error(f"Error loading context for StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + + def save_context(self): + """ + Saves the current strategy execution context to the database. + """ + try: + self.data_cache.modify_datacache_item( + cache_name='strategy_contexts', + filter_vals=[('strategy_instance_id', self.strategy_instance_id)], + field_names=('flags', 'profit_loss', 'active', 'paused', 'exit', 'exit_method', 'start_time'), + new_values=( + json.dumps(self.flags), + self.profit_loss, + self.active, + self.paused, + self.exit, + self.exit_method, + self.start_time.isoformat() + ) + ) + except Exception as e: + logger.error(f"Error saving context for StrategyInstance '{self.strategy_instance_id}': {e}") + traceback.print_exc() + + def execute(self) -> dict[str, Any]: + """ + Executes the strategy's 'next()' method. + :return: Result of the execution. + """ + try: + # Define the local execution environment + exec_context = { + 'flags': self.flags, + 'strategy_id': self.strategy_id, + 'user_id': self.user_id, + 'get_last_candle': self.get_last_candle, + 'get_current_price': self.get_current_price, # Added method + 'buy': self.buy_order, + 'sell': self.sell_order, + 'exit_strategy': self.exit_strategy, + 'notify_user': self.notify_user, + 'process_indicator': self.process_indicator, + 'get_strategy_profit_loss': self.get_strategy_profit_loss, + 'is_in_profit': self.is_in_profit, + 'is_in_loss': self.is_in_loss, + 'get_active_trades': self.get_active_trades, + 'get_starting_balance': self.get_starting_balance, + 'set_paused': self.set_paused, + 'set_exit': self.set_exit + } + + # Execute the generated 'next()' method + exec(self.generated_code, {}, exec_context) + + # Call the 'next()' method + if 'next' in exec_context and callable(exec_context['next']): + exec_context['next']() + else: + logger.error( + f"'next' method not defined in generated_code for StrategyInstance '{self.strategy_instance_id}'.") + + # Retrieve and update profit/loss + self.profit_loss = exec_context.get('profit_loss', self.profit_loss) + self.save_context() + + return {"success": True, "profit_loss": self.profit_loss} + + except Exception as e: + logger.error(f"Error executing 'next()' for StrategyInstance '{self.strategy_instance_id}': {e}") + traceback.print_exc() + return {"success": False, "message": str(e)} + + def set_paused(self, value: bool): + """ + Sets the paused state of the strategy. + :param value: True to pause, False to resume. + """ + self.paused = value + self.save_context() + logger.debug(f"Strategy '{self.strategy_id}' paused: {self.paused}") + + def set_exit(self, exit_flag: bool, exit_method: str = 'all'): + """ + Sets the exit state and method of the strategy. + :param exit_flag: True to initiate exit. + :param exit_method: Method to use for exiting ('all', 'in_profit', 'in_loss'). + """ + self.exit = exit_flag + self.exit_method = exit_method + self.save_context() + logger.debug(f"Strategy '{self.strategy_id}' exit set: {self.exit} with method '{self.exit_method}'") + + def get_total_filled_order_volume(self) -> float: + """ + Retrieves the total filled order volume for the strategy. + """ + return self.trades.get_total_filled_order_volume(self.strategy_id) + + def get_total_unfilled_order_volume(self) -> float: + """ + Retrieves the total unfilled order volume for the strategy. + """ + return self.trades.get_total_unfilled_order_volume(self.strategy_id) + + def get_last_candle(self, candle_part: str, timeframe: str, exchange: str, symbol: str): + """ + Retrieves the last candle data based on provided parameters. + + :param candle_part: Part of the candle data (e.g., 'close', 'open'). + :param timeframe: Timeframe of the candle. + :param exchange: Exchange name. + :param symbol: Trading symbol. + :return: Last candle value. + """ + try: + sdt = dt.datetime.now() - dt.timedelta(minutes=int(timeframe[:-1])) + data = self.data_cache.get_records_since(start_datetime=sdt, ex_details=[exchange, symbol, timeframe]) + if not data.empty: + return data.iloc[-1][candle_part] + else: + logger.warning(f"No candle data found for {exchange} {symbol} {timeframe}.") + return None + except Exception as e: + logger.error(f"Error retrieving last candle: {e}", exc_info=True) + traceback.print_exc() + return None + + def get_current_price(self, timeframe: str = '1h', exchange: str = 'binance', + symbol: str = 'BTC/USD') -> float | None: + """ + Retrieves the current market price for the specified symbol. + + :param timeframe: The timeframe of the data. + :param exchange: The exchange name. + :param symbol: The trading symbol. + :return: The current price or None if unavailable. + """ + try: + # Assuming get_last_candle returns the last candle data as a dictionary + last_candle = self.get_last_candle('close', timeframe, exchange, symbol) + if last_candle is not None: + logger.debug(f"Retrieved current price for {symbol} on {exchange} ({timeframe}): {last_candle}") + return last_candle + else: + logger.warning(f"No last candle data available for {symbol} on {exchange} ({timeframe}).") + return None + except Exception as e: + logger.error(f"Error retrieving current price for {symbol} on {exchange} ({timeframe}): {e}", exc_info=True) + return None + + # Define helper methods + def buy_order(self, size: float, symbol: str, order_type: str = 'market', price: float | None = None, **kwargs): + """ + Executes a buy order. + + :param size: Quantity to buy. + :param symbol: Trading symbol. + :param order_type: Type of order ('market' or 'limit'). + :param price: Price for limit orders. + """ + try: + order_data = { + 'size': size, + 'symbol': symbol, + 'order_type': order_type.lower(), + 'price': price, + **kwargs + } + status, msg = self.trades.buy(order_data, self.user_id) + if status != 'success': + logger.error(f"Buy order failed: {msg}") + self.notify_user(f"Buy order failed: {msg}") + else: + logger.info(f"Buy order executed successfully: {msg}") + except Exception as e: + logger.error(f"Error executing buy order in StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + + def sell_order(self, size: float, symbol: str, order_type: str = 'market', price: float | None = None, **kwargs): + """ + Executes a sell order. + + :param size: Quantity to sell. + :param symbol: Trading symbol. + :param order_type: Type of order ('market' or 'limit'). + :param price: Price for limit orders. + """ + try: + order_data = { + 'size': size, + 'symbol': symbol, + 'order_type': order_type.lower(), + 'price': price, + **kwargs + } + status, msg = self.trades.sell(order_data, self.user_id) + if status != 'success': + logger.error(f"Sell order failed: {msg}") + self.notify_user(f"Sell order failed: {msg}") + else: + logger.info(f"Sell order executed successfully: {msg}") + except Exception as e: + logger.error(f"Error executing sell order in StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + + def exit_strategy(self): + """ + Exits the strategy based on the exit_method. + """ + try: + if self.exit_method == 'all': + self.trades.exit_strategy_all(self.strategy_id) + logger.info(f"Exiting all positions for strategy '{self.strategy_id}'.") + elif self.exit_method == 'in_profit': + self.trades.exit_strategy_in_profit(self.strategy_id) + logger.info(f"Exiting profitable positions for strategy '{self.strategy_id}'.") + elif self.exit_method == 'in_loss': + self.trades.exit_strategy_in_loss(self.strategy_id) + logger.info(f"Exiting losing positions for strategy '{self.strategy_id}'.") + else: + logger.warning( + f"Unknown exit method '{self.exit_method}' for StrategyInstance '{self.strategy_instance_id}'.") + except Exception as e: + logger.error( + f"Error exiting strategy '{self.strategy_id}' in StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + + def notify_user(self, message: str): + """ + Sends a notification to the user. + + :param message: Notification message. + """ + try: + self.trades.notify_user(self.user_id, message) + logger.debug(f"Notification sent to user '{self.user_id}': {message}") + except Exception as e: + logger.error( + f"Error notifying user '{self.user_id}' in StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + + def process_indicator(self, indicator_name: str, output_field: str) -> Any: + """ + Retrieves the latest value of an indicator. + + :param indicator_name: Name of the indicator. + :param output_field: Specific field of the indicator. + :return: Indicator value. + """ + try: + user_indicators = self.indicators.get_indicator_list(user_id=self.user_id) + indicator = user_indicators.get(indicator_name) + if not indicator: + logger.error(f"Indicator '{indicator_name}' not found for user '{self.user_id}'.") + return None + indicator_value = self.indicators.process_indicator(indicator) + value = indicator_value.get(output_field, None) + logger.debug(f"Processed indicator '{indicator_name}' with output field '{output_field}': {value}") + return value + except Exception as e: + logger.error( + f"Error processing indicator '{indicator_name}' in StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + return None + + def get_strategy_profit_loss(self, strategy_id: str) -> float: + """ + Retrieves the current profit or loss of the strategy. + + :param strategy_id: Unique identifier of the strategy. + :return: Profit or loss amount. + """ + try: + profit_loss = self.trades.get_profit(strategy_id) + logger.debug(f"Retrieved profit/loss for strategy '{strategy_id}': {profit_loss}") + return profit_loss + except Exception as e: + logger.error(f"Error retrieving profit/loss for StrategyInstance '{self.strategy_instance_id}': {e}", + exc_info=True) + traceback.print_exc() + return 0.0 + + def is_in_profit(self) -> bool: + """ + Determines if the strategy is currently in profit. + """ + profit = self.profit_loss + logger.debug(f"Checking if in profit: {profit} > 0") + return self.profit_loss > 0 + + def is_in_loss(self) -> bool: + """ + Determines if the strategy is currently in loss. + """ + loss = self.profit_loss + logger.debug(f"Checking if in loss: {loss} < 0") + return self.profit_loss < 0 + + def get_active_trades(self) -> int: + """ + Returns the number of active trades. + """ + active_trades_count = len(self.trades.active_trades) + logger.debug(f"Number of active trades: {active_trades_count}") + return active_trades_count + + def get_starting_balance(self) -> float: + """ + Returns the starting balance. + """ + logger.debug(f"Starting balance: {self.starting_balance}") + return self.starting_balance + + def get_filled_orders(self) -> int: + """ + Retrieves the number of filled orders for the strategy. + """ + return self.trades.get_filled_orders_count(self.strategy_id) + + def get_unfilled_orders(self) -> int: + """ + Retrieves the number of unfilled orders for the strategy. + """ + return self.trades.get_unfilled_orders_count(self.strategy_id) + + def get_available_balance(self) -> float: + """ + Retrieves the available balance for the strategy. + """ + return self.trades.get_available_balance(self.strategy_id) diff --git a/src/static/blocks/blocks/trade_order_blocks.js b/src/static/blocks/blocks/trade_order_blocks.js index 99ab786..772b1f3 100644 --- a/src/static/blocks/blocks/trade_order_blocks.js +++ b/src/static/blocks/blocks/trade_order_blocks.js @@ -31,7 +31,7 @@ export function defineTradeOrderBlocks() { "args0": [ { "type": "field_dropdown", - "name": "tradeType", + "name": "trade_type", "options": [ ["Buy", "buy"], ["Sell", "sell"] diff --git a/src/static/blocks/generators/trade_order_generators.js b/src/static/blocks/generators/trade_order_generators.js index a30fe3c..110d846 100644 --- a/src/static/blocks/generators/trade_order_generators.js +++ b/src/static/blocks/generators/trade_order_generators.js @@ -54,7 +54,7 @@ export function defineTradeOrderGenerators() { } // Retrieve and validate the trade type - const tradeType = block.getFieldValue('tradeType') || 'buy'; + const tradeType = block.getFieldValue('trade_type') || 'buy'; const validatedTradeType = validateTradeType(tradeType); // Retrieve and process the trade size diff --git a/src/static/blocks/indicator_blocks.js b/src/static/blocks/indicator_blocks.js index 85d3d44..4c9b45b 100644 --- a/src/static/blocks/indicator_blocks.js +++ b/src/static/blocks/indicator_blocks.js @@ -21,8 +21,9 @@ export function defineIndicatorBlocks() { for (let indicatorName in indicatorOutputs) { const outputs = indicatorOutputs[indicatorName]; - // Create a unique block type for each indicator - const blockType = 'indicator_' + indicatorName; + // Create a unique block type by replacing spaces with underscores + const sanitizedIndicatorName = indicatorName.replace(/\s+/g, '_'); // Replace spaces with underscores + const blockType = 'indicator_' + sanitizedIndicatorName; // Define the block for this indicator Blockly.defineBlocksWithJsonArray([{