brighter-trading/src/Strategies.py

513 lines
23 KiB
Python

import logging
import pandas as pd
from DataCache_v3 import DataCache
from indicators import Indicators
from trade import Trades
import datetime as dt
import json
import uuid
import traceback
from typing import Any
from PythonGenerator import PythonGenerator
from StrategyInstance import StrategyInstance
# Configure logging
logger = logging.getLogger(__name__)
class Strategies:
def __init__(self, data_cache: DataCache, trades: Trades, indicators: Indicators):
"""
Initializes the Strategies class.
:param data_cache: Instance of DataCache to manage cache and database interactions.
:param trades: Reference to the trades object that maintains trading actions and data.
:param indicators: Reference to the Indicators manager.
"""
self.data_cache = data_cache # Database interaction instance
self.trades = trades
self.indicators_manager = indicators
# Create a cache for strategies with necessary columns
self.data_cache.create_cache(name='strategies',
cache_type='table',
size_limit=500,
eviction_policy='deny',
default_expiration=dt.timedelta(hours=24),
columns=["id", "creator", "name", "workspace", "code", "stats", "public", "fee",
"tbl_key", "strategy_components"])
# Create a cache for strategy contexts to store strategy states and settings
self.data_cache.create_cache(
name='strategy_contexts',
cache_type='table',
size_limit=1000,
eviction_policy='deny',
default_expiration=dt.timedelta(hours=24),
columns=[
"strategy_instance_id", # Unique identifier for the strategy instance
"flags", # JSON-encoded string to store flags
"profit_loss", # Float value for tracking profit/loss
"active", # Boolean or Integer (1/0) for active status
"paused", # Boolean or Integer (1/0) for paused status
"exit", # Boolean or Integer (1/0) for exit status
"exit_method", # String defining exit method
"start_time" # ISO-formatted datetime string for start time
]
)
# Initialize default settings
self.default_timeframe = '5m'
self.default_exchange = 'Binance'
self.default_symbol = 'BTCUSD'
self.active_instances: dict[tuple[int, str], StrategyInstance] = {} # Key: (user_id, strategy_id)
def _save_strategy(self, strategy_data: dict, default_source: dict) -> dict:
"""
Saves a strategy to the cache and database. Handles both creation and editing.
:param strategy_data: A dictionary containing strategy data such as name, code, workspace, etc.
:param default_source: The default source for undefined sources in the strategy.
:return: A dictionary containing success or failure information.
"""
is_edit = 'tbl_key' in strategy_data
try:
if is_edit:
# Editing an existing strategy
tbl_key = strategy_data['tbl_key']
existing_strategy = self.data_cache.get_rows_from_datacache(
cache_name='strategies',
filter_vals=[('tbl_key', tbl_key)]
)
if existing_strategy.empty:
return {"success": False, "message": "Strategy not found."}
else:
# Creating a new strategy
# Generate a unique identifier first
tbl_key = str(uuid.uuid4())
# Check if a strategy with the same name already exists for this user
filter_conditions = [
('creator', strategy_data.get('creator')),
('name', strategy_data['name'])
]
existing_strategy = self.data_cache.get_rows_from_datacache(
cache_name='strategies',
filter_vals=filter_conditions
)
if not existing_strategy.empty:
return {"success": False, "message": "A strategy with this name already exists"}
# Validate and serialize 'workspace' (XML string)
workspace_data = strategy_data.get('workspace')
if not isinstance(workspace_data, str) or not workspace_data.strip():
return {"success": False, "message": "Invalid or empty workspace data"}
# Serialize 'stats' field
try:
stats_data = strategy_data.get('stats', {})
stats_serialized = json.dumps(stats_data)
except (TypeError, ValueError):
return {"success": False, "message": "Invalid stats data format"}
default_source = default_source.copy()
strategy_id = tbl_key
# Extract and validate 'code' as a dictionary
code = strategy_data.get('code')
if isinstance(code, str):
try:
strategy_json = json.loads(code)
if not isinstance(strategy_json, dict):
return {"success": False, "message": "'code' must be a JSON object."}
except json.JSONDecodeError:
return {"success": False, "message": "Invalid JSON format for 'code'."}
elif isinstance(code, dict):
strategy_json = code
# Serialize 'code' to JSON string
try:
serialized_code = json.dumps(code)
strategy_data['code'] = serialized_code
except (TypeError, ValueError):
return {"success": False, "message": "Unable to serialize 'code' field."}
else:
return {"success": False, "message": "'code' must be a JSON string or dictionary."}
# Initialize PythonGenerator
python_generator = PythonGenerator(default_source, strategy_id)
# Generate strategy components (code, indicators, data_sources, flags)
strategy_components = python_generator.generate(strategy_json)
# Add the combined strategy components to the data to be stored
strategy_data['strategy_components'] = json.dumps(strategy_components)
if is_edit:
# Editing existing strategy
tbl_key = strategy_data['tbl_key']
# Prepare the columns and values for the update
columns = (
"creator", "name", "workspace", "code", "stats", "public", "fee", "tbl_key", "strategy_components"
)
values = (
strategy_data.get('creator'),
strategy_data['name'],
workspace_data, # Use the validated workspace data
strategy_data['code'],
stats_serialized, # Serialized stats
bool(strategy_data.get('public', 0)),
float(strategy_data.get('fee', 0.0)),
tbl_key,
strategy_data['strategy_components'] # Serialized strategy components
)
# Update the strategy in the database and cache
self.data_cache.modify_datacache_item(
cache_name='strategies',
filter_vals=[('tbl_key', tbl_key)],
field_names=columns,
new_values=values,
key=tbl_key,
overwrite='tbl_key' # Use 'tbl_key' to identify the entry to overwrite
)
# Return success message
return {"success": True, "message": "Strategy updated successfully"}
else:
# Creating new strategy
# Insert the strategy into the database and cache
self.data_cache.insert_row_into_datacache(
cache_name='strategies',
columns=(
"creator", "name", "workspace", "code", "stats",
"public", "fee", 'tbl_key', 'strategy_components'
),
values=(
strategy_data.get('creator'),
strategy_data['name'],
strategy_data['workspace'],
strategy_data['code'],
stats_serialized,
bool(strategy_data.get('public', 0)),
float(strategy_data.get('fee', 0.0)),
tbl_key,
strategy_data['strategy_components']
)
)
# Construct the saved strategy data to return
saved_strategy = {
"id": tbl_key, # Assuming tbl_key is used as a unique identifier
"creator": strategy_data.get('creator'),
"name": strategy_data['name'],
"workspace": workspace_data, # Original workspace data
"code": strategy_data['code'],
"stats": stats_data,
"public": bool(strategy_data.get('public', 0)),
"fee": float(strategy_data.get('fee', 0.0))
}
# If everything is successful, return a success message along with the saved strategy data
return {
"success": True,
"message": "Strategy created and saved successfully",
"strategy": saved_strategy # Include the strategy data
}
except Exception as e:
# Catch any exceptions and return a failure message
# Log the exception with traceback for debugging
logger.error(f"Failed to save strategy: {e}", exc_info=True)
traceback.print_exc()
operation = "update" if is_edit else "create"
return {"success": False, "message": f"Failed to {operation} strategy: {str(e)}"}
def new_strategy(self, strategy_data: dict, default_source: dict) -> dict:
"""
Add a new strategy to the cache and database.
:param default_source: The default source for undefined sources in the strategy.
:param strategy_data: A dictionary containing strategy data such as name, code, and workspace.
:return: A dictionary containing success or failure information.
"""
return self._save_strategy(strategy_data, default_source)
def edit_strategy(self, strategy_data: dict, default_source: dict) -> dict:
"""
Updates an existing strategy in the cache and database.
:param default_source: The default source for undefined sources in the strategy.
:param strategy_data: A dictionary containing the updated strategy data.
:return: A dictionary containing success or failure information.
"""
return self._save_strategy(strategy_data, default_source)
def delete_strategy(self, user_id: int, name: str) -> dict:
try:
self.data_cache.remove_row_from_datacache(
cache_name='strategies',
filter_vals=[('creator', user_id), ('name', name)]
)
return {"success": True, "message": "Strategy deleted successfully."}
except Exception as e:
logger.error(f"Failed to delete strategy '{name}' for user '{user_id}': {e}", exc_info=True)
return {"success": False, "message": f"Failed to delete strategy: {str(e)}"}
def get_all_strategy_names(self, user_id: int) -> list | None:
"""
Return a list of all strategy names stored in the cache or database.
"""
# Fetch all strategy names from the cache or database
strategies_df = self.get_all_strategies(user_id, 'df')
if not strategies_df.empty:
return strategies_df['name'].tolist()
return None
def get_all_strategies(self, user_id: int, form: str):
"""
Return stored strategies in various formats.
:param user_id: the id of the user making the request.
:param form: The desired format ('obj', 'json', or 'dict').
:return: A list of strategies in the requested format.
"""
valid_forms = {'df', 'json', 'dict'}
if form not in valid_forms:
raise ValueError(f"Invalid form '{form}'. Expected one of {valid_forms}.")
# Fetch all public strategies and user's strategies from the cache or database
public_df = self.data_cache.get_rows_from_datacache(cache_name='strategies', filter_vals=[('public', 1)])
user_df = self.data_cache.get_rows_from_datacache(cache_name='strategies', filter_vals=[('creator', user_id),
('public', 0)])
# Concatenate the two DataFrames (rows from public and user-created strategies)
strategies_df = pd.concat([public_df, user_df], ignore_index=True)
# Return None if no strategies found
if strategies_df.empty:
return None
# Return the strategies in the requested format
if form == 'df':
return strategies_df
elif form == 'json':
return strategies_df.to_json(orient='records')
elif form == 'dict':
return strategies_df.to_dict('records')
def get_strategy_by_name(self, user_id: int, name: str) -> dict[str, Any] | None:
"""
Retrieve a strategy object by name.
:param user_id: The ID of the user making the request.
:param name: The name of the strategy to retrieve.
:return: The strategy DataFrame row if found, otherwise None.
"""
# Fetch all strategies (public and user-specific) as a DataFrame
strategies_df = self.get_all_strategies(user_id, 'df')
# Ensure that strategies_df is not None
if strategies_df is None:
return None
# Filter the DataFrame to find the strategy by name (exact match)
name = name
filtered_strategies = strategies_df.query('name == @name')
# Return None if no matching strategy is found
if filtered_strategies.empty:
return None
# Get the strategy row as a dictionary
strategy_row = filtered_strategies.iloc[0].to_dict()
# Deserialize the 'strategy_components' field
try:
strategy_components = json.loads(strategy_row.get('strategy_components', '{}'))
except json.JSONDecodeError:
strategy_components = {}
strategy_row['strategy_components'] = strategy_components
# If 'code' is stored as a JSON string, deserialize it
if isinstance(strategy_row.get('code'), str):
strategy_row['code'] = json.loads(strategy_row['code'])
return strategy_row
def update_strategy_stats(self, strategy_id: str, profit_loss: float) -> None:
"""
Updates the strategy's statistics based on the latest profit or loss.
:param strategy_id: Identifier of the strategy.
:param profit_loss: Latest profit or loss from strategy execution.
"""
try:
# Fetch the current stats
strategy = self.data_cache.get_rows_from_datacache(cache_name='strategies',
filter_vals=[('tbl_key', strategy_id)])
if strategy.empty:
logger.warning(f"Strategy ID {strategy_id} not found for stats update.")
return
strategy_row = strategy.iloc[0].to_dict()
current_stats = json.loads(strategy_row.get('stats', '{}'))
# Update stats
current_stats['num_trades'] = current_stats.get('num_trades', 0) + 1
current_stats['total_position'] = current_stats.get('total_position', 0.0) + profit_loss
current_stats['total_position_value'] = current_stats.get('total_position_value', 0.0) + profit_loss
# Serialize and update in datacache
self.data_cache.modify_datacache_item(cache_name='strategies', filter_vals=[('tbl_key', strategy_id)],
field_names=('stats',), new_values=(json.dumps(current_stats),),
key=strategy_id, overwrite='tbl_key')
logger.info(f"Updated stats for strategy {strategy_id}: {current_stats}")
except Exception as e:
logger.error(f"Error updating stats for strategy '{strategy_id}': {e}", exc_info=True)
def execute_strategy(self, strategy_data: dict[str, Any]) -> dict[str, Any]:
"""
Executes a strategy based on the provided strategy data.
:param strategy_data: A dictionary containing strategy details.
:return: A dictionary indicating success or failure with relevant messages.
"""
try:
# Extract identifiers
strategy_id = strategy_data.get('id') or strategy_data.get('tbl_key')
strategy_name = strategy_data.get('name')
user_id = strategy_data.get('creator')
if not strategy_id or not strategy_name or not user_id:
return {"success": False, "message": "Strategy data is incomplete."}
# Generate a deterministic strategy_instance_id
strategy_instance_id = f"{user_id}_{strategy_name}"
instance_key = (user_id, strategy_id) # Unique key for the strategy-user pair
# Retrieve or create StrategyInstance
if instance_key not in self.active_instances:
generated_code = strategy_data.get('strategy_components', {}).get('generated_code', '')
if not generated_code:
return {"success": False, "message": "No 'next()' method defined for the strategy."}
# Instantiate StrategyInstance
strategy_instance = StrategyInstance(
strategy_instance_id=strategy_instance_id,
strategy_id=strategy_id,
strategy_name=strategy_name,
user_id=user_id,
generated_code=generated_code,
data_cache=self.data_cache,
indicators=self.indicators_manager,
trades=self.trades
)
# Store in active_instances
self.active_instances[instance_key] = strategy_instance
logger.debug(f"Created new StrategyInstance '{strategy_instance_id}' for strategy '{strategy_id}'.")
else:
strategy_instance = self.active_instances[instance_key]
logger.debug(
f"Retrieved existing StrategyInstance '{strategy_instance_id}' for strategy '{strategy_id}'.")
# Execute the strategy
execution_result = strategy_instance.execute()
if execution_result.get('success'):
profit_loss = execution_result.get('profit_loss', 0.0)
self.update_strategy_stats(strategy_id, profit_loss)
logger.info(f"Strategy '{strategy_id}' executed successfully with profit/loss: {profit_loss}")
# Check if exit was initiated
if strategy_instance.exit:
# Check if all trades are closed
if self.trades.all_trades_closed(strategy_id):
logger.info(f"Strategy '{strategy_id}' has successfully exited all trades.")
# Remove from active_instances
del self.active_instances[instance_key]
# Optionally, mark the strategy as inactive in the database
self.data_cache.modify_datacache_item(
cache_name='strategies',
filter_vals=[('tbl_key', strategy_id)],
field_names=('active',),
new_values=(False,),
key=strategy_id,
overwrite='tbl_key'
)
else:
logger.info(
f"Strategy '{strategy_id}' is exiting. Remaining trades will be closed in subsequent executions.")
return {"success": True, "strategy_profit_loss": profit_loss}
else:
error_message = execution_result.get('message', 'Unknown error.')
logger.error(f"Strategy '{strategy_id}' execution failed: {error_message}")
return {"success": False, "message": f"Strategy execution failed: {error_message}"}
except Exception as e:
logger.error(f"Unexpected error in execute_strategy: {e}")
traceback.print_exc()
return {"success": False, "message": f"Unexpected error: {str(e)}"}
def update(self):
"""
Loops through and executes all activated strategies.
"""
try:
active_strategies = self.data_cache.get_rows_from_datacache('strategies', [('active', True)])
if active_strategies.empty:
logger.info("No active strategies to execute.")
return # No active strategies to execute
for _, strategy_data in active_strategies.iterrows():
self.execute_strategy(strategy_data)
except Exception as e:
logger.error(f"Error updating strategies: {e}", exc_info=True)
traceback.print_exc()
def update_stats(self, strategy_id: str, stats: dict) -> None:
"""
Updates the strategy's statistics with the provided stats.
:param strategy_id: Identifier of the strategy (tbl_key).
:param stats: Dictionary containing statistics to update.
"""
try:
# Fetch the current strategy data
strategy = self.data_cache.get_rows_from_datacache(
cache_name='strategies',
filter_vals=[('tbl_key', strategy_id)]
)
if strategy.empty:
logger.warning(f"Strategy ID {strategy_id} not found for stats update.")
return
strategy_row = strategy.iloc[0].to_dict()
current_stats = json.loads(strategy_row.get('stats', '{}'))
# Merge the new stats with existing stats
current_stats.update(stats)
# Serialize the updated stats
updated_stats_serialized = json.dumps(current_stats)
# Update the stats in the data cache
self.data_cache.modify_datacache_item(
cache_name='strategies',
filter_vals=[('tbl_key', strategy_id)],
field_names=('stats',),
new_values=(updated_stats_serialized,),
key=strategy_id,
overwrite='tbl_key'
)
logger.info(f"Updated stats for strategy '{strategy_id}': {current_stats}")
except Exception as e:
logger.error(f"Error updating stats for strategy '{strategy_id}': {e}", exc_info=True)