451 lines
20 KiB
Python
451 lines
20 KiB
Python
import logging
|
|
import pandas as pd
|
|
from DataCache_v3 import DataCache
|
|
from indicators import Indicators
|
|
from trade import Trades
|
|
import datetime as dt
|
|
import json
|
|
import uuid
|
|
import traceback
|
|
from typing import Any
|
|
from PythonGenerator import PythonGenerator
|
|
from StrategyInstance import StrategyInstance
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Strategies:
|
|
def __init__(self, data_cache: DataCache, trades: Trades, indicators: Indicators):
|
|
"""
|
|
Initializes the Strategies class.
|
|
|
|
:param data_cache: Instance of DataCache to manage cache and database interactions.
|
|
:param trades: Reference to the trades object that maintains trading actions and data.
|
|
:param indicators: Reference to the Indicators manager.
|
|
"""
|
|
self.data_cache = data_cache # Database interaction instance
|
|
self.trades = trades
|
|
self.indicators_manager = indicators
|
|
|
|
# Create a cache for strategies with necessary columns
|
|
self.data_cache.create_cache(name='strategies',
|
|
cache_type='table',
|
|
size_limit=500,
|
|
eviction_policy='deny',
|
|
default_expiration=dt.timedelta(hours=24),
|
|
columns=["id", "creator", "name", "workspace", "code", "stats", "public", "fee",
|
|
"tbl_key", "strategy_components"])
|
|
|
|
# Initialize default settings
|
|
self.default_timeframe = '5m'
|
|
self.default_exchange = 'Binance'
|
|
self.default_symbol = 'BTCUSD'
|
|
|
|
self.active_instances: dict[tuple[int, str], StrategyInstance] = {} # Key: (user_id, strategy_id)
|
|
|
|
|
|
def _save_strategy(self, strategy_data: dict, default_source: dict) -> dict:
|
|
"""
|
|
Saves a strategy to the cache and database. Handles both creation and editing.
|
|
|
|
:param strategy_data: A dictionary containing strategy data such as name, code, workspace, etc.
|
|
:param default_source: The default source for undefined sources in the strategy.
|
|
:return: A dictionary containing success or failure information.
|
|
"""
|
|
is_edit = 'tbl_key' in strategy_data
|
|
try:
|
|
if is_edit:
|
|
# Editing an existing strategy
|
|
tbl_key = strategy_data['tbl_key']
|
|
existing_strategy = self.data_cache.get_rows_from_datacache(
|
|
cache_name='strategies',
|
|
filter_vals=[('tbl_key', tbl_key)]
|
|
)
|
|
if existing_strategy.empty:
|
|
return {"success": False, "message": "Strategy not found."}
|
|
else:
|
|
# Creating a new strategy
|
|
# Generate a unique identifier first
|
|
tbl_key = str(uuid.uuid4())
|
|
|
|
# Check if a strategy with the same name already exists for this user
|
|
filter_conditions = [
|
|
('creator', strategy_data.get('creator')),
|
|
('name', strategy_data['name'])
|
|
]
|
|
existing_strategy = self.data_cache.get_rows_from_datacache(
|
|
cache_name='strategies',
|
|
filter_vals=filter_conditions
|
|
)
|
|
if not existing_strategy.empty:
|
|
return {"success": False, "message": "A strategy with this name already exists"}
|
|
|
|
# Validate and serialize 'workspace' (XML string)
|
|
workspace_data = strategy_data.get('workspace')
|
|
if not isinstance(workspace_data, str) or not workspace_data.strip():
|
|
return {"success": False, "message": "Invalid or empty workspace data"}
|
|
|
|
# Serialize 'stats' field
|
|
try:
|
|
stats_data = strategy_data.get('stats', {})
|
|
stats_serialized = json.dumps(stats_data)
|
|
except (TypeError, ValueError):
|
|
return {"success": False, "message": "Invalid stats data format"}
|
|
|
|
default_source = default_source.copy()
|
|
strategy_id = tbl_key
|
|
|
|
# Extract and validate 'code' as a dictionary
|
|
code = strategy_data.get('code')
|
|
if isinstance(code, str):
|
|
try:
|
|
strategy_json = json.loads(code)
|
|
if not isinstance(strategy_json, dict):
|
|
return {"success": False, "message": "'code' must be a JSON object."}
|
|
except json.JSONDecodeError:
|
|
return {"success": False, "message": "Invalid JSON format for 'code'."}
|
|
elif isinstance(code, dict):
|
|
strategy_json = code
|
|
else:
|
|
return {"success": False, "message": "'code' must be a JSON string or dictionary."}
|
|
|
|
# Initialize PythonGenerator
|
|
python_generator = PythonGenerator(default_source, strategy_id)
|
|
|
|
# Generate strategy components (code, indicators, data_sources, flags)
|
|
strategy_components = python_generator.generate(strategy_json)
|
|
|
|
# Add the combined strategy components to the data to be stored
|
|
strategy_data['strategy_components'] = json.dumps(strategy_components)
|
|
|
|
if is_edit:
|
|
# Editing existing strategy
|
|
tbl_key = strategy_data['tbl_key']
|
|
# Prepare the columns and values for the update
|
|
columns = (
|
|
"creator", "name", "workspace", "code", "stats", "public", "fee", "tbl_key", "strategy_components"
|
|
)
|
|
values = (
|
|
strategy_data.get('creator'),
|
|
strategy_data['name'],
|
|
workspace_data, # Use the validated workspace data
|
|
strategy_data['code'],
|
|
stats_serialized, # Serialized stats
|
|
bool(strategy_data.get('public', 0)),
|
|
float(strategy_data.get('fee', 0.0)),
|
|
tbl_key,
|
|
strategy_data['strategy_components'] # Serialized strategy components
|
|
)
|
|
|
|
# Update the strategy in the database and cache
|
|
self.data_cache.modify_datacache_item(
|
|
cache_name='strategies',
|
|
filter_vals=[('tbl_key', tbl_key)],
|
|
field_names=columns,
|
|
new_values=values,
|
|
key=tbl_key,
|
|
overwrite='tbl_key' # Use 'tbl_key' to identify the entry to overwrite
|
|
)
|
|
|
|
# Return success message
|
|
return {"success": True, "message": "Strategy updated successfully"}
|
|
|
|
else:
|
|
# Creating new strategy
|
|
# Insert the strategy into the database and cache
|
|
self.data_cache.insert_row_into_datacache(
|
|
cache_name='strategies',
|
|
columns=(
|
|
"creator", "name", "workspace", "code", "stats",
|
|
"public", "fee", 'tbl_key', 'strategy_components'
|
|
),
|
|
values=(
|
|
strategy_data.get('creator'),
|
|
strategy_data['name'],
|
|
strategy_data['workspace'],
|
|
strategy_data['code'],
|
|
stats_serialized,
|
|
bool(strategy_data.get('public', 0)),
|
|
float(strategy_data.get('fee', 0.0)),
|
|
tbl_key,
|
|
strategy_data['strategy_components']
|
|
)
|
|
)
|
|
|
|
# Construct the saved strategy data to return
|
|
saved_strategy = {
|
|
"id": tbl_key, # Assuming tbl_key is used as a unique identifier
|
|
"creator": strategy_data.get('creator'),
|
|
"name": strategy_data['name'],
|
|
"workspace": workspace_data, # Original workspace data
|
|
"code": strategy_data['code'],
|
|
"stats": stats_data,
|
|
"public": bool(strategy_data.get('public', 0)),
|
|
"fee": float(strategy_data.get('fee', 0.0))
|
|
}
|
|
# If everything is successful, return a success message along with the saved strategy data
|
|
return {
|
|
"success": True,
|
|
"message": "Strategy created and saved successfully",
|
|
"strategy": saved_strategy # Include the strategy data
|
|
}
|
|
|
|
except Exception as e:
|
|
# Catch any exceptions and return a failure message
|
|
# Log the exception with traceback for debugging
|
|
logger.error(f"Failed to save strategy: {e}", exc_info=True)
|
|
traceback.print_exc()
|
|
operation = "update" if is_edit else "create"
|
|
return {"success": False, "message": f"Failed to {operation} strategy: {str(e)}"}
|
|
|
|
|
|
def new_strategy(self, strategy_data: dict, default_source: dict) -> dict:
|
|
"""
|
|
Add a new strategy to the cache and database.
|
|
|
|
:param default_source: The default source for undefined sources in the strategy.
|
|
:param strategy_data: A dictionary containing strategy data such as name, code, and workspace.
|
|
:return: A dictionary containing success or failure information.
|
|
"""
|
|
return self._save_strategy(strategy_data, default_source)
|
|
|
|
def edit_strategy(self, strategy_data: dict, default_source: dict) -> dict:
|
|
"""
|
|
Updates an existing strategy in the cache and database.
|
|
|
|
:param default_source: The default source for undefined sources in the strategy.
|
|
:param strategy_data: A dictionary containing the updated strategy data.
|
|
:return: A dictionary containing success or failure information.
|
|
"""
|
|
return self._save_strategy(strategy_data, default_source)
|
|
|
|
def delete_strategy(self, user_id: int, name: str) -> dict:
|
|
try:
|
|
self.data_cache.remove_row_from_datacache(
|
|
cache_name='strategies',
|
|
filter_vals=[('creator', user_id), ('name', name)]
|
|
)
|
|
return {"success": True, "message": "Strategy deleted successfully."}
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete strategy '{name}' for user '{user_id}': {e}", exc_info=True)
|
|
return {"success": False, "message": f"Failed to delete strategy: {str(e)}"}
|
|
|
|
def get_all_strategy_names(self, user_id: int) -> list | None:
|
|
"""
|
|
Return a list of all strategy names stored in the cache or database.
|
|
"""
|
|
# Fetch all strategy names from the cache or database
|
|
strategies_df = self.get_all_strategies(user_id, 'df')
|
|
|
|
if not strategies_df.empty:
|
|
return strategies_df['name'].tolist()
|
|
return None
|
|
|
|
def get_all_strategies(self, user_id: int, form: str):
|
|
"""
|
|
Return stored strategies in various formats.
|
|
|
|
:param user_id: the id of the user making the request.
|
|
:param form: The desired format ('obj', 'json', or 'dict').
|
|
:return: A list of strategies in the requested format.
|
|
"""
|
|
valid_forms = {'df', 'json', 'dict'}
|
|
if form not in valid_forms:
|
|
raise ValueError(f"Invalid form '{form}'. Expected one of {valid_forms}.")
|
|
|
|
# Fetch all public strategies and user's strategies from the cache or database
|
|
public_df = self.data_cache.get_rows_from_datacache(cache_name='strategies', filter_vals=[('public', 1)])
|
|
user_df = self.data_cache.get_rows_from_datacache(cache_name='strategies', filter_vals=[('creator', user_id),
|
|
('public', 0)])
|
|
|
|
# Concatenate the two DataFrames (rows from public and user-created strategies)
|
|
strategies_df = pd.concat([public_df, user_df], ignore_index=True)
|
|
|
|
# Return None if no strategies found
|
|
if strategies_df.empty:
|
|
return None
|
|
|
|
# Return the strategies in the requested format
|
|
if form == 'df':
|
|
return strategies_df
|
|
elif form == 'json':
|
|
return strategies_df.to_json(orient='records')
|
|
elif form == 'dict':
|
|
return strategies_df.to_dict('records')
|
|
|
|
def get_strategy_by_name(self, user_id: int, name: str) -> dict[str, Any] | None:
|
|
"""
|
|
Retrieve a strategy object by name.
|
|
|
|
:param user_id: The ID of the user making the request.
|
|
:param name: The name of the strategy to retrieve.
|
|
:return: The strategy DataFrame row if found, otherwise None.
|
|
"""
|
|
# Fetch all strategies (public and user-specific) as a DataFrame
|
|
strategies_df = self.get_all_strategies(user_id, 'df')
|
|
|
|
# Ensure that strategies_df is not None
|
|
if strategies_df is None:
|
|
return None
|
|
|
|
# Filter the DataFrame to find the strategy by name (exact match)
|
|
name = name
|
|
filtered_strategies = strategies_df.query('name == @name')
|
|
|
|
# Return None if no matching strategy is found
|
|
if filtered_strategies.empty:
|
|
return None
|
|
|
|
# Get the strategy row as a dictionary
|
|
strategy_row = filtered_strategies.iloc[0].to_dict()
|
|
|
|
# Deserialize the 'strategy_components' field
|
|
try:
|
|
strategy_components = json.loads(strategy_row.get('strategy_components', '{}'))
|
|
except json.JSONDecodeError:
|
|
strategy_components = {}
|
|
strategy_row['strategy_components'] = strategy_components
|
|
|
|
# If 'code' is stored as a JSON string, deserialize it
|
|
if isinstance(strategy_row.get('code'), str):
|
|
strategy_row['code'] = json.loads(strategy_row['code'])
|
|
|
|
return strategy_row
|
|
|
|
def update_strategy_stats(self, strategy_id: str, profit_loss: float) -> None:
|
|
"""
|
|
Updates the strategy's statistics based on the latest profit or loss.
|
|
|
|
:param strategy_id: Identifier of the strategy.
|
|
:param profit_loss: Latest profit or loss from strategy execution.
|
|
"""
|
|
try:
|
|
# Fetch the current stats
|
|
strategy = self.data_cache.get_rows_from_datacache(cache_name='strategies',
|
|
filter_vals=[('tbl_key', strategy_id)])
|
|
|
|
if strategy.empty:
|
|
logger.warning(f"Strategy ID {strategy_id} not found for stats update.")
|
|
return
|
|
|
|
strategy_row = strategy.iloc[0].to_dict()
|
|
current_stats = json.loads(strategy_row.get('stats', '{}'))
|
|
|
|
# Update stats
|
|
current_stats['num_trades'] = current_stats.get('num_trades', 0) + 1
|
|
current_stats['total_position'] = current_stats.get('total_position', 0.0) + profit_loss
|
|
current_stats['total_position_value'] = current_stats.get('total_position_value', 0.0) + profit_loss
|
|
|
|
# Serialize and update in datacache
|
|
self.data_cache.modify_datacache_item(cache_name='strategies', filter_vals=[('tbl_key', strategy_id)],
|
|
field_names=('stats',), new_values=(json.dumps(current_stats),),
|
|
key=strategy_id, overwrite='tbl_key')
|
|
logger.info(f"Updated stats for strategy {strategy_id}: {current_stats}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating stats for strategy '{strategy_id}': {e}", exc_info=True)
|
|
|
|
def execute_strategy(self, strategy_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Executes a strategy based on the provided strategy data.
|
|
|
|
:param strategy_data: A dictionary containing strategy details.
|
|
:return: A dictionary indicating success or failure with relevant messages.
|
|
"""
|
|
try:
|
|
# Extract identifiers
|
|
strategy_id = strategy_data.get('id') or strategy_data.get('tbl_key')
|
|
strategy_name = strategy_data.get('name')
|
|
user_id = strategy_data.get('creator')
|
|
|
|
if not strategy_id or not strategy_name or not user_id:
|
|
return {"success": False, "message": "Strategy data is incomplete."}
|
|
|
|
# Unique key for the strategy-user pair
|
|
instance_key = (user_id, strategy_id)
|
|
|
|
# Retrieve or create StrategyInstance
|
|
if instance_key not in self.active_instances:
|
|
generated_code = strategy_data.get('strategy_components', {}).get('generated_code', '')
|
|
if not generated_code:
|
|
return {"success": False, "message": "No 'next()' method defined for the strategy."}
|
|
|
|
# Instantiate StrategyInstance
|
|
strategy_instance = StrategyInstance(
|
|
strategy_instance_id=str(uuid.uuid4()),
|
|
strategy_id=strategy_id,
|
|
strategy_name=strategy_name,
|
|
user_id=user_id,
|
|
generated_code=generated_code,
|
|
data_cache=self.data_cache,
|
|
indicators=self.indicators_manager,
|
|
trades=self.trades
|
|
)
|
|
|
|
# Load existing context or initialize
|
|
strategy_instance.load_context()
|
|
|
|
# Store in active_instances
|
|
self.active_instances[instance_key] = strategy_instance
|
|
logger.debug(
|
|
f"Created new StrategyInstance '{strategy_instance.strategy_instance_id}' for strategy '{strategy_id}'.")
|
|
|
|
else:
|
|
strategy_instance = self.active_instances[instance_key]
|
|
logger.debug(
|
|
f"Retrieved existing StrategyInstance '{strategy_instance.strategy_instance_id}' for strategy '{strategy_id}'.")
|
|
|
|
# Execute the strategy
|
|
execution_result = strategy_instance.execute()
|
|
|
|
if execution_result.get('success'):
|
|
profit_loss = execution_result.get('profit_loss', 0.0)
|
|
self.update_strategy_stats(strategy_id, profit_loss)
|
|
logger.info(f"Strategy '{strategy_id}' executed successfully with profit/loss: {profit_loss}")
|
|
|
|
# Check if exit was initiated
|
|
if strategy_instance.exit:
|
|
# Check if all trades are closed
|
|
if self.trades.all_trades_closed(strategy_id):
|
|
logger.info(f"Strategy '{strategy_id}' has successfully exited all trades.")
|
|
# Remove from active_instances
|
|
del self.active_instances[instance_key]
|
|
# Optionally, mark the strategy as inactive in the database
|
|
self.data_cache.modify_datacache_item(
|
|
cache_name='strategies',
|
|
filter_vals=[('tbl_key', strategy_id)],
|
|
field_names=('active',),
|
|
new_values=(False,),
|
|
key=strategy_id,
|
|
overwrite='tbl_key'
|
|
)
|
|
else:
|
|
logger.info(f"Strategy '{strategy_id}' is exiting. Remaining"
|
|
f" trades will be closed in subsequent executions.")
|
|
|
|
return {"success": True, "strategy_profit_loss": profit_loss}
|
|
else:
|
|
error_message = execution_result.get('message', 'Unknown error.')
|
|
logger.error(f"Strategy '{strategy_id}' execution failed: {error_message}")
|
|
return {"success": False, "message": f"Strategy execution failed: {error_message}"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in execute_strategy: {e}")
|
|
traceback.print_exc()
|
|
return {"success": False, "message": f"Unexpected error: {str(e)}"}
|
|
|
|
def update(self):
|
|
"""
|
|
Loops through and executes all activated strategies.
|
|
"""
|
|
try:
|
|
active_strategies = self.data_cache.get_rows_from_datacache('strategies', [('active', True)])
|
|
if active_strategies.empty:
|
|
logger.info("No active strategies to execute.")
|
|
return # No active strategies to execute
|
|
for _, strategy_data in active_strategies.iterrows():
|
|
self.execute_strategy(strategy_data)
|
|
except Exception as e:
|
|
logger.error(f"Error updating strategies: {e}", exc_info=True)
|
|
traceback.print_exc()
|