brighter-trading/src/PythonGenerator.py

1638 lines
66 KiB
Python

# PythonGenerator.py
import logging
import math
import re
from typing import Any, Dict, List, Set, Tuple, Union
logger = logging.getLogger(__name__)
class PythonGenerator:
def __init__(self, default_source: Dict[str, Any], strategy_id: str):
"""
Initializes the PythonGenerator.
:param default_source: The default source for undefined sources in the strategy.
:param strategy_id: The ID of the strategy.
"""
self.indicators_used: List[Dict[str, Any]] = []
self.data_sources_used: Set[Tuple[str, str, str]] = set()
self.flags_used: Set[str] = set()
self.default_source: Dict[str, Any] = default_source
self.strategy_id: str = strategy_id
# Initialize scheduled action tracking
self.scheduled_action_count: int = 0
self.scheduled_actions: List[str] = []
def generate(self, strategy_json: dict) -> dict:
code_lines = ["def next():"]
indent_level = 1 # Starting indentation inside the function
# Extract statements from the strategy
statements = strategy_json.get('statements', [])
if not isinstance(statements, list):
logger.error("'statements' should be a list in strategy_json.")
# Handle as needed, possibly returning an error or skipping code generation
return {
"generated_code": '\n'.join(code_lines),
"indicators": list(self.indicators_used),
"data_sources": list(self.data_sources_used),
"flags_used": list(self.flags_used)
}
# Process each statement
code_lines.extend(self.generate_code_from_json(statements, indent_level))
# Handle exit logic at the end of 'next()'
indent = ' ' * indent_level
exit_indent = ' ' * (indent_level + 1)
code_lines.append(f"{indent}exit = flags.get('exit', False)")
code_lines.append(f"{indent}if exit:")
code_lines.append(f"{exit_indent}exit_strategy()")
code_lines.append(f"{exit_indent}set_paused(True) # Pause the strategy while exiting.")
return {
"generated_code": '\n'.join(code_lines),
"indicators": list(self.indicators_used),
"data_sources": list(self.data_sources_used),
"flags_used": list(self.flags_used)
}
# ==============================
# Helper Methods
# ==============================
def process_numeric_input(self, value, option_name, default_value=1, min_value=None, max_value=None):
"""
Processes a numeric input, applying limits if necessary.
:param value: The value to validate.
:param option_name: The name of the option for logging.
:param default_value: Default value if input is invalid.
:param min_value: Minimum allowable value.
:param max_value: Maximum allowable value.
:return: Validated numeric value.
"""
if isinstance(value, (int, float)):
if min_value is not None and value < min_value:
logger.warning(f"{option_name} value too low. Clamping to {min_value}.")
return min_value
if max_value is not None and value > max_value:
logger.warning(f"{option_name} value too high. Clamping to {max_value}.")
return max_value
return value
elif isinstance(value, dict):
return self.generate_code_from_json(value, 0)
else:
logger.warning(f"Invalid {option_name} value. Using default: {default_value}.")
return default_value
def process_numeric_list(self, value, indent_level: int, allow_single_value: bool = True) -> str:
"""
Processes a numeric input or list of inputs, handling dynamic values and lists.
:param value: The input value, which may be a single value, list, or nested expression.
:param indent_level: Current indentation level.
:param allow_single_value: Whether to return a single value as-is instead of wrapped in a list.
:return: A string representing the processed numeric expression(s).
"""
if isinstance(value, list):
# Process each item in the list individually
processed_values = [
self.generate_condition_code(v, indent_level) if isinstance(v, dict) else str(v)
for v in value
]
return f"[{', '.join(processed_values)}]"
elif isinstance(value, dict):
# Handle nested expressions
return self.generate_condition_code(value, indent_level)
elif allow_single_value:
# Single numeric value case
return str(value)
else:
# Forcing single value into a list format if allow_single_value is False
return f"[{str(value)}]"
def generate_code_from_json(self, json_nodes: Any, indent_level: int) -> List[str]:
"""
Recursively generates code lines from JSON nodes.
:param json_nodes: The current JSON node(s) to process.
:param indent_level: Current indentation level.
:return: A list of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
if isinstance(json_nodes, dict):
json_nodes = [json_nodes]
for node in json_nodes:
node_type = node.get('type')
if not node_type:
logger.warning("Node missing 'type'. Skipping.")
continue # Skip nodes without a type
logger.debug(f"Handling node of type: {node_type}")
handler_method = getattr(self, f'handle_{node_type}', self.handle_default)
handler_code = handler_method(node, indent_level)
if isinstance(handler_code, list):
code_lines.extend(handler_code)
elif isinstance(handler_code, str):
code_lines.append(handler_code)
# Process 'next' recursively if present
next_node = node.get('next')
if next_node:
code_lines.extend(self.generate_code_from_json(next_node, indent_level))
return code_lines
def handle_default(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Default handler for unhandled condition types.
:param node: The node with an unhandled type.
:param indent_level: Current indentation level.
:return: A string representing the default condition.
"""
node_type = node.get('type')
logger.warning(f"Unhandled condition node type: {node_type}. Defaulting to 'False'.")
return 'False'
def generate_condition_code(self, condition_node: Dict[str, Any], indent_level: int) -> str:
"""
Generates the condition code string based on the condition node.
:param condition_node: The condition node dictionary.
:param indent_level: Current indentation level.
:return: A string representing the condition in Python syntax.
"""
# Check if the condition_node is a direct value or a nested condition
if not isinstance(condition_node, dict):
# If condition_node is not a dict, return its string representation
return str(condition_node)
node_type = condition_node.get('type')
if not node_type:
return 'False' # Default to False if node type is missing
# Retrieve the handler method based on node_type
handler_method = getattr(self, f'handle_{node_type}', self.handle_default)
condition_code = handler_method(condition_node, indent_level=indent_level)
return condition_code
# Add more helper methods here
# ==============================
# Indicators Handlers
# ==============================
def handle_indicator(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'indicator_a_bolengerband' node type by generating a function call to retrieve indicator values.
:param node: The indicator_a_bolengerband node.
:param indent_level: Current indentation level.
:return: A string representing the indicator value retrieval.
"""
fields = node.get('fields', {})
indicator_name = fields.get('NAME')
output_field = fields.get('OUTPUT')
if not indicator_name or not output_field:
logger.error("indicator node missing 'NAME' or 'OUTPUT'.")
return 'None'
# Collect the indicator information
self.indicators_used.append({
'name': indicator_name,
'output': output_field
})
# Generate code that calls process_indicator
expr = f"process_indicator('{indicator_name}', '{output_field}')"
logger.debug(f"Generated indicator condition: {expr}")
return expr
# ==============================
# Balances Handlers
# ==============================
def handle_starting_balance(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'starting_balance' condition type.
:param node: The starting_balance node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_starting_balance()"
def handle_current_balance(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'current_balance' condition type.
:param node: The current_balance node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_current_balance()"
def handle_available_balance(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'available_balance' condition type.
:param node: The available_balance node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_available_balance()"
def handle_available_strategy_balance(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'available_strategy_balance' condition type.
:param node: The available_strategy_balance node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_available_strategy_balance()"
def handle_set_available_strategy_balance(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'set_available_strategy_balance' node type.
:param node: The set_available_strategy_balance node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
# Extract the 'BALANCE' input
balance = node.get('inputs', {}).get('BALANCE', 0)
# If 'balance' is a dict, generate nested code
if isinstance(balance, dict):
balance = self.generate_condition_code(balance, indent_level)
# Generate the code line to set the available strategy balance
code_lines.append(f"{indent}set_available_strategy_balance({balance})")
logger.debug(f"Generated set_available_strategy_balance action with balance: {balance}")
return code_lines
# ==============================
# Order Metrics Handlers
# ==============================
def handle_order_volume(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'order_volume' condition type.
:param node: The order_volume node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
order_type = node.get('inputs', {}).get('order_type', 'filled').lower()
if order_type not in ['filled', 'unfilled']:
logger.error(f"Invalid order type '{order_type}' in 'order_volume'. Defaulting to 'filled'.")
order_type = 'filled'
expr = f"get_order_volume(order_type='{order_type}')"
logger.debug(f"Generated order_volume condition: {expr}")
return expr
def handle_filled_orders(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'filled_orders' condition type.
:param node: The filled_orders node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_filled_orders()"
def handle_unfilled_orders(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'unfilled_orders' condition type.
:param node: The unfilled_orders node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_unfilled_orders()"
def handle_order_status(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'order_status' condition type.
:param node: The order_status node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
order_name = node.get('inputs', {}).get('order_name', 'last_order').strip()
order_status = node.get('inputs', {}).get('status', 'filled').lower()
valid_statuses = ['filled', 'unfilled', 'partial']
if not order_name:
logger.warning("Empty 'order_name' in 'order_status'. Using 'last_order'.")
order_name = 'last_order'
if order_status not in valid_statuses:
logger.error(f"Invalid order status '{order_status}' in 'order_status'. Defaulting to 'filled'.")
order_status = 'filled'
expr = f"is_order_status(order_name='{order_name}', status='{order_status}')"
logger.debug(f"Generated order_status condition: {expr}")
return expr
# Add other Order Metrics handlers here...
# ==============================
# Trade Metrics Handlers
# ==============================
def handle_active_trades(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'active_trades' condition type.
:param node: The active_trades node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_active_trades()"
def handle_total_trades(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'total_trades' condition type.
:param node: The total_trades node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_total_trades()"
def handle_last_trade_details(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'last_trade_details' condition type.
:param node: The last_trade_details node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
details = node.get('inputs', {}).get('details', 'price').lower()
valid_details = ['price', 'volume', 'direction']
if details not in valid_details:
logger.error(f"Invalid details '{details}' in 'last_trade_details'. Defaulting to 'price'.")
details = 'price'
expr = f"get_last_trade_details(detail='{details}')"
logger.debug(f"Generated last_trade_details condition: {expr}")
return expr
def handle_average_entry_price(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'average_entry_price' condition type.
:param node: The average_entry_price node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_average_entry_price()"
def handle_unrealized_profit_loss(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'unrealized_profit_loss' condition type.
:param node: The unrealized_profit_loss node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_unrealized_profit_loss()"
def handle_user_active_trades(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'user_active_trades' condition type.
:param node: The user_active_trades node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
return "get_user_active_trades()"
# Add other Trade Metrics handlers here...
# ==============================
# Time Metrics Handlers
# ==============================
def handle_time_since_start(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'time_since_start' condition type.
:param node: The time_since_start node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
# Extract the 'unit' from inputs
unit = node.get('inputs', {}).get('unit', 'seconds').lower()
valid_units = ['seconds', 'minutes', 'hours']
if unit not in valid_units:
logger.error(f"Invalid time unit '{unit}' in 'time_since_start'. Defaulting to 'seconds'.")
unit = 'seconds'
expr = f"get_time_since_start(unit='{unit}')"
logger.debug(f"Generated time_since_start condition: {expr}")
return expr
# Add other Time Metrics handlers here...
# ==============================
# Market Data Handlers
# ==============================
def handle_current_price(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'current_price' condition type.
:param node: The current_price node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
# Process source input
inputs = node.get('inputs', {})
source_node = inputs.get('source', {})
timeframe = source_node.get('timeframe', self.default_source.get('timeframe', '1m'))
exchange = source_node.get('exchange', self.default_source.get('exchange', 'Binance'))
symbol = source_node.get('symbol', self.default_source.get('market', 'BTC/USD'))
# Track data sources
self.data_sources_used.add((exchange, symbol, timeframe))
# Correctly format the function call with separate parameters
return f"get_current_price(timeframe='{timeframe}', exchange='{exchange}', symbol='{symbol}')"
def handle_bid_price(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'bid_price' condition type.
:param node: The bid_price node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
# Process source input
inputs = node.get('inputs', {})
source_node = inputs.get('source', {})
timeframe = source_node.get('timeframe', self.default_source.get('timeframe', '1m'))
exchange = source_node.get('exchange', self.default_source.get('exchange', 'Binance'))
symbol = source_node.get('symbol', self.default_source.get('market', 'BTC/USD'))
# Track data sources
self.data_sources_used.add((exchange, symbol, timeframe))
# Correctly format the function call with separate parameters
return f"get_bid_price(timeframe='{timeframe}', exchange='{exchange}', symbol='{symbol}')"
def handle_ask_price(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'ask_price' condition type.
:param node: The ask_price node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
# Process source input
inputs = node.get('inputs', {})
source_node = inputs.get('source', {})
timeframe = source_node.get('timeframe', self.default_source.get('timeframe', '1m'))
exchange = source_node.get('exchange', self.default_source.get('exchange', 'Binance'))
symbol = source_node.get('symbol', self.default_source.get('market', 'BTC/USD'))
# Track data sources
self.data_sources_used.add((exchange, symbol, timeframe))
# Correctly format the function call with separate parameters
return f"get_ask_price(timeframe='{timeframe}', exchange='{exchange}', symbol='{symbol}')"
def handle_last_candle_value(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'last_candle_value' condition type by generating a function call to get candle data.
:param node: The last_candle_value node.
:param indent_level: Current indentation level.
:return: A string representing the candle data retrieval.
"""
inputs = node.get('inputs', {})
candle_part = inputs.get('candle_part', 'close').lower()
valid_candle_parts = ['open', 'high', 'low', 'close']
if candle_part not in valid_candle_parts:
logger.error(f"Invalid candle_part '{candle_part}' in 'last_candle_value'. Defaulting to 'close'.")
candle_part = 'close'
# Process source input
source_node = node.get('source', {})
timeframe = source_node.get('timeframe', self.default_source.get('timeframe', '1m'))
exchange = source_node.get('exchange', self.default_source.get('exchange', 'Binance'))
symbol = source_node.get('symbol', self.default_source.get('market', 'BTC/USD'))
# Track data sources
self.data_sources_used.add((exchange, symbol, timeframe))
expr = f"get_last_candle(candle_part='{candle_part}', timeframe='{timeframe}', exchange='{exchange}', symbol='{symbol}')"
logger.debug(f"Generated last_candle_value condition: {expr}")
return expr
def handle_source(self, node: Dict[str, Any], indent_level: int) -> Dict[str, Any]:
"""
Handles the 'source' condition type.
:param node: The source node.
:param indent_level: Current indentation level.
:return: A dictionary representing the source configuration.
"""
timeframe = node.get('time_frame', '5m')
exchange = node.get('exchange', 'Binance')
symbol = node.get('symbol', 'BTC/USD')
# Track data sources
self.data_sources_used.add((exchange, symbol, timeframe))
expr = {
'timeframe': timeframe,
'exchange': exchange,
'symbol': symbol
}
logger.debug(f"Generated source configuration: {expr}")
return expr
# Add other Market Data handlers here...
# ==============================
# Logical Handlers
# ==============================
def handle_comparison(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'comparison' condition type.
:param node: The comparison node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
operator = node.get('operator')
inputs = node.get('inputs', {})
left_node = inputs.get('LEFT')
right_node = inputs.get('RIGHT')
if not operator or not left_node or not right_node:
logger.error("comparison node missing 'operator', 'LEFT', or 'RIGHT'.")
return 'False'
operator_map = {
'>': '>',
'<': '<',
'>=': '>=',
'<=': '<=',
'==': '==',
'!=': '!='
}
python_operator = operator_map.get(operator)
if not python_operator:
logger.error(f"Unsupported operator '{operator}'. Defaulting to '=='.")
python_operator = '=='
left_expr = self.generate_condition_code(left_node, indent_level)
right_expr = self.generate_condition_code(right_node, indent_level)
condition = f"{left_expr} {python_operator} {right_expr}"
logger.debug(f"Generated comparison condition: {condition}")
return condition
def handle_logical_and(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'logical_and' condition type.
:param node: The logical_and node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
inputs = node.get('inputs', {})
left_node = inputs.get('LEFT')
right_node = inputs.get('RIGHT')
if not left_node or not right_node:
logger.warning("logical_and node missing 'LEFT' or 'RIGHT'. Defaulting to 'False'.")
return 'False'
left_expr = self.generate_condition_code(left_node, indent_level)
right_expr = self.generate_condition_code(right_node, indent_level)
condition = f"{left_expr} and {right_expr}"
logger.debug(f"Generated logical AND condition: {condition}")
return condition
def handle_logical_or(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'logical_or' condition type.
:param node: The logical_or node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
inputs = node.get('inputs', {})
left_node = inputs.get('LEFT')
right_node = inputs.get('RIGHT')
if not left_node or not right_node:
logger.warning("logical_or node missing 'LEFT' or 'RIGHT'. Defaulting to 'False'.")
return 'False'
left_expr = self.generate_condition_code(left_node, indent_level)
right_expr = self.generate_condition_code(right_node, indent_level)
condition = f"{left_expr} or {right_expr}"
logger.debug(f"Generated logical OR condition: {condition}")
return condition
def handle_is_false(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'is_false' condition type.
:param node: The is_false node.
:param indent_level: Current indentation level.
:return: A string representing the condition.
"""
condition_node = node.get('condition')
if not condition_node:
logger.error("is_false node missing 'condition'.")
return 'True' # Negation of 'False' is 'True'
condition_expr = self.generate_condition_code(condition_node, indent_level)
condition = f"not ({condition_expr})"
logger.debug(f"Generated is_false condition: {condition}")
return condition
# Add other Logical handlers here...
# ==============================
# Trade Order Handlers
# ==============================
def handle_trade_action(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'trade_action' node type by passing trade options as arguments to a single trade_order function.
:param node: The trade_action node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
# Accessing fields and inputs
trade_type = node.get('trade_type', 'buy').lower() # 'buy' or 'sell'
inputs = node.get('inputs', {})
# Initialize defaults
tif = node.get('tif', 'GTC') # Time in Force
# Set initial default values for optional parameters
price_type = node.get('price_type', 'market')
# price_value is not used in current context; can be removed or utilized as needed
# price_value = node.get('price_value', None)
size = inputs.get('size', 1)
if isinstance(size, dict):
size = self.generate_condition_code(size, indent_level) # Handle nested json
# Handle Trade Options if present
trade_options = {}
trade_options_nodes = node.get('trade_options', [])
for option in trade_options_nodes:
option_type = option.get('type')
option_inputs = option.get('inputs', {})
handler_method = getattr(self, f'handle_{option_type}', self.handle_default_trade_option)
option_value = handler_method(option_inputs, indent_level)
if option_value is not None:
trade_options[option_type] = option_value
# Collect data sources
source = trade_options.get('source', self.default_source)
exchange = source.get('exchange', 'binance')
symbol = source.get('symbol', 'BTC/USD')
timeframe = source.get('timeframe', '5m')
self.data_sources_used.add((exchange, symbol, timeframe))
# Prepare arguments for trade_order function
trade_order_args = {
'trade_type': f"'{trade_options.get('trade_type', trade_type)}'",
'size': size,
'order_type': f"'{price_type}'",
'source': self.format_trade_option(source),
'tif': f"'{trade_options.get('tif', tif)}'",
'stop_loss': self.format_trade_option(trade_options.get('stop_loss', {})),
'trailing_stop': self.format_trade_option(trade_options.get('trailing_stop', {})),
'take_profit': self.format_trade_option(trade_options.get('take_profit', {})),
'limit': self.format_trade_option(trade_options.get('limit', {})),
'trailing_limit': self.format_trade_option(trade_options.get('trailing_limit', {})),
'target_market': self.format_trade_option(trade_options.get('target_market', {})),
'name_order': self.format_trade_option(trade_options.get('name_order', {})),
}
# Remove keys with empty dictionaries or None
trade_order_args = {k: v for k, v in trade_order_args.items() if v not in [{}, None]}
# Convert trade_order_args dictionary to a comma-separated string
args_str = ', '.join([f"{key}={value}" for key, value in trade_order_args.items()])
# Generate the trade_order function call
code_lines.append(f"{indent}trade_order({args_str})")
return code_lines
def format_trade_option(self, option: Dict[str, Any]) -> str:
"""
Formats the trade option dictionary into a Python dictionary string.
:param option: The trade option dictionary.
:return: A string representing the trade option or 'None' if empty.
"""
if not option:
return 'None'
# Precompile the regex pattern for market symbols (e.g., 'BTC/USD')
market_symbol_pattern = re.compile(r'^[A-Z]{3}/[A-Z]{3}$')
def is_market_symbol(value: str) -> bool:
"""
Determines if a string is a market symbol following the pattern 'XXX/YYY'.
:param value: The string to check.
:return: True if it matches the market symbol pattern, False otherwise.
"""
return bool(market_symbol_pattern.match(value))
def format_value(value: Any) -> str:
if isinstance(value, str):
if is_market_symbol(value):
return f"'{value}'" # Quote market symbols like 'BTC/USD'
# Check if the string represents an expression (contains operators or function calls)
elif any(op in value for op in ['(', ')', '+', '-', '*', '/', '.']):
return value # Assume it's an expression and return as-is
else:
return f"'{value}'"
elif isinstance(value, dict):
# Recursively format nested dictionaries
nested_items = [f"'{k}': {format_value(v)}" for k, v in value.items()]
return f"{{{', '.join(nested_items)}}}"
elif isinstance(value, (int, float)):
return str(value)
else:
logger.error(f"Unsupported value type: {type(value)}. Using 'None'.")
return 'None'
items = [f"'{key}': {format_value(value)}" for key, value in option.items()]
return f"{{{', '.join(items)}}}"
def handle_default_trade_option(self, inputs: Dict[str, Any], indent_level: int) -> Any:
"""
Default handler for unimplemented trade options.
:param inputs: The inputs for the trade option.
:param indent_level: Current indentation level.
:return: None
"""
logger.warning(f"Unhandled trade option with inputs: {inputs}")
return None
def handle_tpsl_options(self, option_name: str, inputs: Dict[str, Any], indent_level: int) -> Any:
"""
Generalized handler for trade options like 'stop_loss' and 'take_profit'.
Supports:
- Single value: Standard Stop Loss / Take Profit
- Two values: Stop Loss / Take Profit with a Limit
The values can be int, float, string, dict (node), or a mixed list.
:param option_name: The name of the trade option ('stop_loss' or 'take_profit').
:param inputs: The inputs for the trade option.
:param indent_level: Current indentation level.
:return: A dictionary representing the trade option configuration.
"""
option_input = inputs.get(option_name)
if not option_input:
logger.error(f"{option_name} option missing '{option_name}' input.")
return None
option_config = {}
def process_value(value):
if isinstance(value, dict):
return self.generate_condition_code(value, indent_level)
elif isinstance(value, (int, float, str)):
return value
else:
logger.error(f"Unsupported type for {option_name}: {type(value)}")
return 'None'
if isinstance(option_input, list):
if len(option_input) >= 1:
option_config['value'] = process_value(option_input[0])
if len(option_input) >= 2:
option_config['limit'] = process_value(option_input[1])
if len(option_input) > 2:
logger.warning(f"{option_name} input list has more than two elements; extras will be ignored.")
else:
option_config['value'] = process_value(option_input)
logger.debug(f"Generated {option_name} configuration: {option_config}")
return option_config
def handle_take_profit(self, inputs: Dict[str, Any], indent_level: int) -> Any:
"""Handles the 'take_profit' trade option by delegating to the generalized handler."""
return self.handle_tpsl_options('take_profit', inputs, indent_level)
def handle_stop_loss(self, inputs: Dict[str, Any], indent_level: int) -> Any:
"""Handles the 'stop_loss' trade option by delegating to the generalized handler."""
return self.handle_tpsl_options('stop_loss', inputs, indent_level)
def handle_time_in_force(self, inputs: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'time_in_force' trade option by generating the corresponding value.
:param inputs: The inputs for time_in_force.
:param indent_level: Current indentation level.
:return: A string representing the time_in_force value.
"""
tif = inputs.get('time_in_force', 'gtc').lower()
valid_tif = {'gtc', 'fok', 'ioc'}
if tif not in valid_tif:
logger.warning(f"Invalid Time in Force '{tif}', defaulting to 'gtc'")
tif = 'gtc'
return f"'{tif}'"
def handle_limit(self, inputs: Dict[str, Any], indent_level: int) -> Any:
"""
Handles the 'limit' trade option by processing the limit input.
:param inputs: The inputs for limit.
:param indent_level: Current indentation level.
:return: The limit value or None if invalid.
"""
limit = inputs.get('limit')
processed_limit = self.process_numeric_input(limit, 'limit')
if processed_limit is not None:
return {'limit': processed_limit}
return None
def handle_trailing_stop(self, inputs: Dict[str, Any], indent_level: int) -> Any:
"""
Handles the 'trailing_stop' trade option by processing the trailing distance.
:param inputs: The inputs for trailing_stop.
:param indent_level: Current indentation level.
:return: A dictionary representing the trailing_stop configuration or None if invalid.
"""
trail_distance = inputs.get('trail_distance')
processed_trail = self.process_numeric_input(trail_distance, 'trailing_stop')
if processed_trail is not None:
return {'trail_distance': processed_trail}
return None
def handle_trailing_limit(self, inputs: Dict[str, Any], indent_level: int) -> Any:
"""
Handles the 'trailing_limit' trade option by processing the trail limit distance.
:param inputs: The inputs for trailing_limit.
:param indent_level: Current indentation level.
:return: A dictionary representing the trailing_limit configuration or None if invalid.
"""
trail_limit_distance = inputs.get('trail_limit_distance')
processed_trail_limit = self.process_numeric_input(trail_limit_distance, 'trailing_limit')
if processed_trail_limit is not None:
return {'trail_limit_distance': processed_trail_limit}
return None
def handle_target_market(self, inputs: Dict[str, Any], indent_level: int) -> Dict[str, Any]:
"""
Handles the 'target_market' trade option by retrieving time frame, exchange, and symbol.
:param inputs: The inputs for target_market.
:param indent_level: Current indentation level.
:return: A dictionary representing the target market configuration.
"""
time_frame = inputs.get('time_frame', '1m')
exchange = inputs.get('exchange', 'Binance')
symbol = inputs.get('symbol', 'BTC/USD')
target_market = {
'time_frame': time_frame,
'exchange': exchange,
'symbol': symbol
}
return target_market
def handle_name_order(self, inputs: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'name_order' trade option by processing the order name.
:param inputs: The inputs for name_order.
:param indent_level: Current indentation level.
:return: A string representing the name_order value.
"""
order_name = inputs.get('order_name', 'Unnamed Order').strip()
return f"'{order_name}'" if order_name else "'Unnamed Order'"
# Add other Trade Order handlers here...
# ==============================
# Control Handlers
# ==============================
def handle_pause_strategy(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'pause_strategy' node type.
:param node: The pause_strategy node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
code_lines.append(f"{indent}set_paused(True)")
code_lines.append(f"{indent}notify_user('Strategy paused.')")
return code_lines
def handle_strategy_resume(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'strategy_resume' node type.
:param node: The strategy_resume node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
code_lines.append(f"{indent}self.resume_strategy()")
code_lines.append(f"{indent}notify_user('Strategy resumed.')")
return code_lines
def handle_strategy_exit(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'strategy_exit' node type.
:param node: The strategy_exit node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
exit_option = node.get('condition', 'all') # 'all', 'in_profit', 'in_loss'
code_lines.append(f"{indent}set_exit(True, '{exit_option}') # Initiate exit")
code_lines.append(f"{indent}set_paused(True) # Pause the strategy while exiting")
return code_lines
def handle_max_drawdown(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'max_drawdown' condition type by generating the corresponding Python code.
:param node: The max_drawdown node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
inputs = node.get('inputs', {})
if not inputs:
logger.error("max_drawdown node missing 'inputs'. Skipping.")
return code_lines # Return empty list
drawdown = inputs.get('drawdown')
action = inputs.get('action', 'PAUSE') # 'PAUSE' or 'EXIT'
processed_drawdown = self.process_numeric_input(drawdown, 'drawdown')
if processed_drawdown is None:
logger.error("Invalid or missing 'drawdown' value. Skipping 'max_drawdown' configuration.")
return code_lines # Return empty list
# Assuming 'set_max_drawdown' is a method in the strategy class
expr = f"set_max_drawdown({processed_drawdown}, '{action}')"
code_lines.append(f"{indent}{expr}")
logger.debug(f"Generated max_drawdown condition: {expr}")
return code_lines
def handle_schedule_action(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'schedule_action' node type by generating the corresponding Python code.
:param node: The schedule_action node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
# Extract and validate inputs
inputs = node.get('inputs', {})
time_interval = inputs.get('time_interval', '5m')
repeat = inputs.get('repeat', False)
action = inputs.get('action', [])
if not isinstance(time_interval, str):
logger.warning(f"Invalid type for time_interval: {type(time_interval)}. Defaulting to '1m'.")
time_interval = '1m'
if not isinstance(repeat, bool):
logger.warning(f"Invalid type for repeat: {type(repeat)}. Defaulting to False.")
repeat = False
# Generate unique function name and a timestamp variable name to track last execution
self.scheduled_action_count += 1
function_name = f"scheduled_action_{self.scheduled_action_count}"
last_execution_var = f"last_execution_{self.scheduled_action_count}"
# Set up the time parsing and interval conversion based on standard time suffixes
time_multipliers = {'m': 60, 'h': 3600, 'd': 86400}
try:
interval_value = int(time_interval[:-1])
interval_unit = time_interval[-1]
interval_seconds = interval_value * time_multipliers.get(interval_unit, 60)
except ValueError:
logger.error(f"Invalid time interval format: '{time_interval}'. Using default 5 minutes.")
interval_seconds = 5 * 60
# Define the action function with time-checking logic
code_lines.append(f"{indent}def {function_name}(current_time):")
code_lines.append(f"{indent} from datetime import timedelta")
code_lines.append(f"{indent} global {last_execution_var}") # Track last execution time globally
code_lines.append(f"{indent} if {last_execution_var} is None:")
code_lines.append(
f"{indent} {last_execution_var} = current_time - timedelta(seconds={interval_seconds})")
# Check if enough time has passed since the last execution
code_lines.append(f"{indent} time_since_last = (current_time - {last_execution_var}).total_seconds()")
code_lines.append(f"{indent} if time_since_last >= {interval_seconds}:")
code_lines.append(f"{indent} {last_execution_var} = current_time # Update last execution time")
# Generate action code within the time condition
if not action:
code_lines.append(f"{indent} pass # No actions defined")
else:
action_code = self.generate_code_from_json(action, indent_level + 2)
if not action_code:
code_lines.append(f"{indent} pass # No valid actions defined")
else:
code_lines.extend(action_code)
# If repeat is False, remove the function from the schedule after first execution
if not repeat:
code_lines.append(f"{indent} self.scheduled_actions.remove({function_name})")
# Register this function in the schedule
code_lines.append(f"{indent}self.schedule_action({function_name})")
logger.debug(
f"Generated schedule_action with function '{function_name}', time_interval='{time_interval}', repeat={repeat}")
return code_lines
def handle_execute_if(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'execute_if' node type by generating an if statement with the given condition and do actions.
:param node: The execute_if node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
inputs = node.get('inputs', {})
condition = inputs.get('CONDITION', {})
do_statements = node.get('statements', {}).get('DO', [])
condition_code = self.generate_condition_code(condition, indent_level)
code_lines.append(f"{indent}if {condition_code}:")
if not do_statements:
code_lines.append(f"{indent} pass # No actions defined")
else:
action_code = self.generate_code_from_json(do_statements, indent_level + 1)
if not action_code:
code_lines.append(f"{indent} pass # No valid actions defined")
else:
code_lines.extend(action_code)
logger.debug(f"Generated execute_if with condition '{condition_code}'")
return code_lines
# Add other Control handlers here...
# ==============================
# Values and Flags Handlers
# ==============================
def handle_dynamic_value(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'dynamic_value' node type.
:param node: The dynamic_value node.
:param indent_level: Current indentation level.
:return: A string representing the value.
"""
values = node.get('values', [])
if not values:
logger.error("dynamic_value node has no 'values'.")
return 'None'
# Assuming the first value is the primary value
first_value = values[0]
if isinstance(first_value, dict):
return self.generate_condition_code(first_value, indent_level)
else:
return str(first_value)
def handle_notify_user(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'notify_user' node type.
:param node: The notify_user node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
indent = ' ' * indent_level
message = node.get('message', 'No message provided.').replace("'", "\\'")
return [f"{indent}notify_user('{message}')"]
def handle_value_input(self, node: Dict[str, Any], indent_level: int) -> Union[str, List[str]]:
"""
Handles the 'value_input' node type, managing sequential or nested values.
:param node: The value_input node.
:param indent_level: Current indentation level.
:return: A single string if there is only one value, or a list of strings if multiple values.
"""
values = node.get('values', [])
if not values:
return '' # Return an empty string if no values are present
# If there's only one value, handle it directly
if len(values) == 1:
value = values[0]
return self.generate_code_from_json(value, indent_level) if isinstance(value, dict) else str(value)
# For multiple values, create a list of processed values
return [
self.generate_code_from_json(v, indent_level) if isinstance(v, dict) else str(v)
for v in values
]
def handle_get_variable(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'get_variable' node type, managing variable retrieval with chaining.
:param node: The get_variable node.
:param indent_level: Current indentation level.
:return: A string representing the variable retrieval.
"""
variable_name = node.get('variable_name', '').strip()
if not variable_name:
logger.error("get_variable node missing 'variable_name'.")
return 'None'
return f"variables.get('{variable_name}', None)"
def handle_set_variable(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'set_variable' node type, supporting dynamic value assignment.
:param node: The set_variable node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
variable_name = node.get('variable_name', '').strip()
value_node = node.get('values')
if not variable_name:
logger.error("set_variable node missing 'variable_name'. Skipping.")
return []
# Process 'values' list as either a single or aggregated expression
if isinstance(value_node, list):
processed_values = [
self.generate_condition_code(v, indent_level) if isinstance(v, dict) else str(v)
for v in value_node
]
value_code = processed_values[0] if len(processed_values) == 1 else f"[{', '.join(processed_values)}]"
else:
value_code = self.generate_condition_code(value_node, indent_level) if value_node else '0'
code_lines.append(f"{indent}variables['{variable_name}'] = {value_code}")
return code_lines
def handle_set_flag(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'set_flag' node type, setting a flag conditionally.
:param node: The set_flag node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
code_lines = []
indent = ' ' * indent_level
flag_name = node.get('flag_name')
if not flag_name:
logger.error("set_flag node missing 'flag_name'. Skipping.")
return []
flag_value_input = node.get('flag_value', 'True')
flag_value = 'True' if str(flag_value_input).strip().lower() == 'true' else 'False'
code_lines.append(f"{indent}flags['{flag_name}'] = {flag_value}")
self.flags_used.add(flag_name)
# # Process 'next' field if present
# next_node = node.get('next')
# if next_node:
# next_code = self.generate_code_from_json(next_node, indent_level)
# if isinstance(next_code, list):
# code_lines.extend(next_code)
# elif isinstance(next_code, str):
# code_lines.append(next_code)
return code_lines
def handle_flag_is_set(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'flag_is_set' condition type, checking if a flag is set to a specific value.
:param node: The flag_is_set node.
:param indent_level: Current indentation level.
:return: A string representing the flag condition.
"""
flag_name = node.get('flag_name')
flag_value = node.get('flag_value', True) # Default to True if not specified
if not flag_name:
logger.error("flag_is_set node missing 'flag_name'.")
return 'False'
# Generate condition based on flag_value
if isinstance(flag_value, bool):
condition = f"flags.get('{flag_name}', False) == {flag_value}"
else:
logger.error(f"Unsupported flag_value type: {type(flag_value)}. Defaulting to 'False'.")
condition = 'False'
logger.debug(f"Generated flag_is_set condition: {condition}")
return condition
# Add other Values and Flags handlers here...
# ==============================
# Risk Management Handlers
# ==============================
def handle_set_leverage(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'set_leverage' node type.
:param node: The set_leverage node.
:param indent_level: Current indentation level.
:return: A string representing the leverage setting action.
"""
leverage = node.get('inputs', {}).get('LEVERAGE', 1)
leverage = self.process_numeric_input(leverage, 'leverage', 1, min_value=1, max_value=100)
return f"set_leverage({leverage})"
def handle_current_margin(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'current_margin' node type.
:param node: The current_margin node.
:param indent_level: Current indentation level.
:return: A string representing the condition to check current margin.
"""
return "get_current_margin()"
def handle_risk_ratio(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'risk_ratio' node type.
:param node: The risk_ratio node.
:param indent_level: Current indentation level.
:return: A string representing the risk ratio condition.
"""
operator = node.get('operator')
value = node.get('value')
if not operator or value is None:
logger.error("risk_ratio node missing 'operator' or 'value'.")
return 'False'
operator_map = {
'>': '>',
'<': '<',
'>=': '>=',
'<=': '<=',
'==': '==',
'!=': '!='
}
python_operator = operator_map.get(operator)
if not python_operator:
logger.error(f"Invalid operator for risk_ratio: {operator}.")
return 'False'
return f"calculate_risk_ratio() {python_operator} {value}"
def handle_max_position_size(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'max_position_size' node type.
:param node: The max_position_size node.
:param indent_level: Current indentation level.
:return: A string representing the condition to check the max position size.
"""
max_size = node.get('inputs', {}).get('MAX_SIZE', 1)
max_size = self.process_numeric_input(max_size, 'max_position_size', 1, min_value=1)
return f"get_open_positions_count() < {max_size}"
# ==============================
# Math Handlers
# ==============================
def handle_math_operation(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'math_operation' node type.
:param node: The math_operation node.
:param indent_level: Current indentation level.
:return: A string representing the math operation.
"""
# Extract from 'inputs' instead of top-level
inputs = node.get('inputs', {})
operator = inputs.get('operator', 'ADD').upper()
left_operand = inputs.get('left_operand')
right_operand = inputs.get('right_operand')
operator_map = {
'ADD': '+',
'SUBTRACT': '-',
'MULTIPLY': '*',
'DIVIDE': '/'
}
python_operator = operator_map.get(operator, '+')
if operator not in operator_map:
logger.error(f"Invalid operator for math_operation: {operator}. Defaulting to 'ADD'.")
left_expr = self.generate_condition_code(left_operand, indent_level)
right_expr = self.generate_condition_code(right_operand, indent_level)
expr = f"({left_expr} {python_operator} {right_expr})"
logger.debug(f"Generated math_operation expression: {expr}")
return expr
def handle_power(self, node: Dict[str, Any], indent_level: int) -> str:
base = self.process_numeric_list(node.get('base', 2), indent_level)
exponent = self.process_numeric_list(node.get('exponent', 3), indent_level)
expr = f"{base} ** {exponent}"
logger.debug(f"Generated power expression: {expr}")
return expr
def handle_modulo(self, node: Dict[str, Any], indent_level: int) -> str:
dividend = self.process_numeric_list(node.get('dividend', 10), indent_level)
divisor = self.process_numeric_list(node.get('divisor', 3), indent_level)
expr = f"{dividend} % {divisor}"
logger.debug(f"Generated modulo expression: {expr}")
return expr
def handle_sqrt(self, node: Dict[str, Any], indent_level: int) -> str:
number = self.process_numeric_list(node.get('number', 16), indent_level)
expr = f"math.sqrt({number})"
logger.debug(f"Generated sqrt expression: {expr}")
return expr
def handle_abs(self, node: Dict[str, Any], indent_level: int) -> str:
number = self.process_numeric_list(node.get('number', -5), indent_level)
expr = f"abs({number})"
logger.debug(f"Generated abs expression: {expr}")
return expr
def handle_max(self, node: Dict[str, Any], indent_level: int) -> str:
numbers = self.process_numeric_list(node.get('numbers', [1, 2, 3]), indent_level)
expr = f"max({numbers})"
logger.debug(f"Generated max expression: {expr}")
return expr
def handle_min(self, node: Dict[str, Any], indent_level: int) -> str:
numbers = self.process_numeric_list(node.get('numbers', [1, 2, 3]), indent_level)
expr = f"min({numbers})"
logger.debug(f"Generated min expression: {expr}")
return expr
def handle_factorial(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'factorial' node type.
:param node: The factorial node.
:param indent_level: Current indentation level.
:return: A string representing the factorial operation.
"""
number = self.process_numeric_list(node.get('number', 1), indent_level)
expr = f"math.factorial({number})"
logger.debug(f"Generated factorial expression: {expr}")
return expr
def handle_log(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'log' node type.
:param node: The log node.
:param indent_level: Current indentation level.
:return: A string representing the logarithm operation.
"""
number = self.process_numeric_list(node.get('number', 1), indent_level)
base = self.process_numeric_list(node.get('base', 10), indent_level)
expr = f"math.log({number}, {base})"
logger.debug(f"Generated log expression: {expr}")
return expr
def handle_ln(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'ln' node type.
:param node: The ln node.
:param indent_level: Current indentation level.
:return: A string representing the natural logarithm operation.
"""
number = self.process_numeric_list(node.get('number', 1), indent_level)
expr = f"math.log({number})"
logger.debug(f"Generated ln expression: {expr}")
return expr
def handle_trig(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'trig' node type.
:param node: The trig node.
:param indent_level: Current indentation level.
:return: A string representing the trigonometric function.
"""
operator = node.get('operator', 'sin').lower() # 'sin', 'cos', 'tan'
angle = self.process_numeric_list(node.get('angle', 0), indent_level)
valid_operators = {'sin', 'cos', 'tan'}
if operator not in valid_operators:
logger.error(f"Invalid trig operator '{operator}'. Defaulting to 'sin'.")
operator = 'sin'
expr = f"math.{operator}({angle})"
logger.debug(f"Generated trig expression: {expr}")
return expr
def handle_mean(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'mean' node type.
:param node: The mean node.
:param indent_level: Current indentation level.
:return: A string representing the mean calculation.
"""
numbers = self.process_numeric_list(node.get('numbers', []), indent_level)
if not numbers.strip('[]'):
logger.error("Mean operation received an empty list.")
return "0" # or another default value or error handling
expr = f"statistics.mean({numbers})"
logger.debug(f"Generated mean expression: {expr}")
return expr
def handle_median(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'median' node type.
:param node: The median node.
:param indent_level: Current indentation level.
:return: A string representing the median calculation.
"""
numbers = self.process_numeric_list(node.get('numbers', []), indent_level)
if not numbers.strip('[]'):
logger.error("Median operation received an empty list.")
return "0" # or another default value or error handling
expr = f"statistics.median({numbers})"
logger.debug(f"Generated median expression: {expr}")
return expr
def handle_std_dev(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'std_dev' node type.
:param node: The std_dev node.
:param indent_level: Current indentation level.
:return: A string representing the standard deviation calculation.
"""
numbers = self.process_numeric_list(node.get('numbers', []), indent_level)
if not numbers.strip('[]'):
logger.error("Standard deviation operation received an empty list.")
return "0" # or another default value or error handling
expr = f"statistics.stdev({numbers})"
logger.debug(f"Generated std_dev expression: {expr}")
return expr
def handle_round(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'round' node type.
:param node: The round node.
:param indent_level: Current indentation level.
:return: A string representing the rounding operation.
"""
number = self.process_numeric_list(node.get('number', 0), indent_level)
decimals = self.process_numeric_list(node.get('decimals', 0), indent_level)
expr = f"round({number}, {decimals})"
logger.debug(f"Generated round expression: {expr}")
return expr
def handle_floor(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'floor' node type.
:param node: The floor node.
:param indent_level: Current indentation level.
:return: A string representing the floor operation.
"""
number = self.process_numeric_list(node.get('number', 0), indent_level)
expr = f"math.floor({number})"
logger.debug(f"Generated floor expression: {expr}")
return expr
def handle_ceil(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'ceil' node type.
:param node: The ceil node.
:param indent_level: Current indentation level.
:return: A string representing the ceiling operation.
"""
number = self.process_numeric_list(node.get('number', 0), indent_level)
expr = f"math.ceil({number})"
logger.debug(f"Generated ceil expression: {expr}")
return expr
def handle_random(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'random' node type.
:param node: The random node.
:param indent_level: Current indentation level.
:return: A string representing the random integer generation.
"""
min_val = self.process_numeric_list(node.get('min', 0), indent_level)
max_val = self.process_numeric_list(node.get('max', 1), indent_level)
expr = f"random.randint({min_val}, {max_val})"
logger.debug(f"Generated random expression: {expr}")
return expr
def handle_clamp(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles the 'clamp' node type.
:param node: The clamp node.
:param indent_level: Current indentation level.
:return: A string representing the clamp operation.
"""
number = self.process_numeric_list(node.get('number', 0), indent_level)
min_val = node.get('min') # Optional
max_val = node.get('max') # Optional
if min_val is not None:
min_val = self.process_numeric_input(min_val, 'min', default_value=number, min_value=-math.inf)
else:
min_val = 'None'
if max_val is not None:
max_val = self.process_numeric_input(max_val, 'max', default_value=number, max_value=math.inf)
else:
max_val = 'None'
# Generate the clamp expression
if min_val != 'None' and max_val != 'None':
expr = f"max({min_val}, min({number}, {max_val}))"
elif min_val != 'None':
expr = f"max({min_val}, {number})"
elif max_val != 'None':
expr = f"min({number}, {max_val})"
else:
expr = str(number) # No clamping needed
logger.debug(f"Generated clamp expression: {expr}")
return expr
# add other math handlers here...