1512 lines
67 KiB
Python
1512 lines
67 KiB
Python
import json
|
||
import logging
|
||
from typing import Any
|
||
|
||
from Users import Users
|
||
from DataCache_v3 import DataCache
|
||
from Strategies import Strategies
|
||
from backtesting import Backtester
|
||
from candles import Candles
|
||
from Configuration import Configuration
|
||
from ExchangeInterface import ExchangeInterface
|
||
from indicators import Indicators
|
||
from Signals import Signals
|
||
from trade import Trades
|
||
|
||
# Configure logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class BrighterTrades:
|
||
def __init__(self, socketio):
|
||
# Object that interacts with the persistent data.
|
||
self.data = DataCache()
|
||
|
||
# Object that interacts and maintains exchange_interface and account data
|
||
self.exchanges = ExchangeInterface(self.data)
|
||
|
||
# Set the exchange for datacache to use
|
||
self.data.set_exchange(self.exchanges)
|
||
|
||
# Configuration for the app
|
||
self.config = Configuration()
|
||
|
||
# The object that manages users in the system.
|
||
self.users = Users(data_cache=self.data)
|
||
|
||
# Object that maintains signals.
|
||
self.signals = Signals(self.data)
|
||
|
||
# Object that maintains candlestick and price data.
|
||
self.candles = Candles(users=self.users, exchanges=self.exchanges, datacache=self.data,
|
||
config=self.config)
|
||
|
||
# Object that interacts with and maintains data from available indicators
|
||
self.indicators = Indicators(self.candles, self.users, self.data)
|
||
|
||
# Object that maintains the trades data
|
||
self.trades = Trades(self.users)
|
||
# The Trades object needs to connect to an exchange_interface.
|
||
self.trades.connect_exchanges(exchanges=self.exchanges)
|
||
|
||
# Object that maintains the strategies data
|
||
self.strategies = Strategies(self.data, self.trades, self.indicators)
|
||
|
||
# Object responsible for testing trade and strategies data.
|
||
self.backtester = Backtester(data_cache=self.data, strategies=self.strategies,
|
||
indicators=self.indicators, socketio=socketio)
|
||
self.backtests = {} # In-memory storage for backtests (replace with DB access in production)
|
||
|
||
@staticmethod
|
||
def _coerce_user_id(user_id: Any) -> int | None:
|
||
if user_id is None or user_id == '':
|
||
return None
|
||
try:
|
||
return int(user_id)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
def resolve_user_name(self, msg_data: dict | None) -> str | None:
|
||
"""
|
||
Resolve a username from payload fields, accepting both legacy and migrated key shapes.
|
||
"""
|
||
if not isinstance(msg_data, dict):
|
||
return None
|
||
|
||
user_name = msg_data.get('user_name') or msg_data.get('user')
|
||
if user_name:
|
||
return user_name
|
||
|
||
user_id = self._coerce_user_id(msg_data.get('user_id') or msg_data.get('userId'))
|
||
if user_id is None:
|
||
return None
|
||
|
||
try:
|
||
return self.users.get_username(user_id=user_id)
|
||
except Exception:
|
||
logger.warning(f"Unable to resolve user_name from user id '{user_id}'.")
|
||
return None
|
||
|
||
def resolve_user_id(self, msg_data: dict | None, user_name: str | None = None) -> int | None:
|
||
"""
|
||
Resolve a user id from payload fields, accepting both legacy and migrated key shapes.
|
||
"""
|
||
if isinstance(msg_data, dict):
|
||
user_id = self._coerce_user_id(msg_data.get('user_id') or msg_data.get('userId'))
|
||
if user_id is not None:
|
||
return user_id
|
||
|
||
if user_name:
|
||
try:
|
||
return self.get_user_info(user_name=user_name, info='User_id')
|
||
except Exception:
|
||
logger.warning(f"Unable to resolve user_id from user_name '{user_name}'.")
|
||
return None
|
||
return None
|
||
|
||
def create_new_user(self, email: str, username: str, password: str) -> bool:
|
||
"""
|
||
Creates a new user and logs the user in.
|
||
|
||
:param email: User's email address.
|
||
:param username: User's user_name.
|
||
:param password: User's password.
|
||
:return: bool - True on successful creation and log in.
|
||
"""
|
||
if not email or not username or not password:
|
||
raise ValueError("Missing required arguments for 'create_new_user'")
|
||
|
||
try:
|
||
self.users.create_new_user(email=email, username=username, password=password)
|
||
login_successful = self.users.log_in_user(username=username, password=password)
|
||
return login_successful
|
||
except Exception as e:
|
||
# Handle specific exceptions or log the error
|
||
raise ValueError("Error creating a new user: " + str(e))
|
||
|
||
def log_user_in_out(self, user_name: str, cmd: str, password: str = None):
|
||
"""
|
||
Logs the user in or out based on the provided command.
|
||
|
||
:param user_name: The user_name.
|
||
:param cmd: The command indicating the action to perform ('logout' or 'login').
|
||
:param password: The password for logging in. Required if cmd is 'login'.
|
||
:return: True if the action was successful, False otherwise.
|
||
"""
|
||
if cmd not in ['login', 'logout']:
|
||
raise ValueError("Invalid command. Expected 'login' or 'logout'.")
|
||
|
||
try:
|
||
if cmd == 'logout':
|
||
return self.users.log_out_user(username=user_name)
|
||
elif cmd == 'login':
|
||
if password is None:
|
||
raise ValueError("Password is required for login.")
|
||
return self.users.log_in_user(username=user_name, password=password)
|
||
except Exception as e:
|
||
# Handle specific exceptions or log the error
|
||
raise ValueError("Error during user login/logout: " + str(e))
|
||
|
||
def get_user_info(self, user_name: str, info: str) -> Any | None:
|
||
"""
|
||
Returns specified user info.
|
||
|
||
:param user_name: The user_name.
|
||
:param info: The information being requested.('Chart View','Is logged in?', 'User_id')
|
||
:return: The requested info or None.
|
||
:raises ValueError: If the provided info is invalid.
|
||
"""
|
||
|
||
if info == 'Chart View':
|
||
try:
|
||
return self.users.get_chart_view(user_name=user_name)
|
||
except Exception as e:
|
||
# Handle specific exceptions or log the error
|
||
raise ValueError("Error retrieving chart view information: " + str(e))
|
||
elif info == 'Is logged in?':
|
||
try:
|
||
return self.users.is_logged_in(user_name=user_name)
|
||
except Exception as e:
|
||
# Handle specific exceptions or log the error
|
||
raise ValueError("Error checking logged in status: " + str(e))
|
||
elif info == 'User_id':
|
||
try:
|
||
return self.users.get_id(user_name=user_name)
|
||
except Exception as e:
|
||
# Handle specific exceptions or log the error
|
||
raise ValueError("Error fetching id: " + str(e))
|
||
else:
|
||
raise ValueError("Invalid information requested: " + info)
|
||
|
||
def get_market_info(self, info: str, **kwargs) -> Any:
|
||
"""
|
||
Request market information from the application.
|
||
|
||
:param info: str - The information requested.
|
||
:param kwargs: arguments required depending on the info requested.
|
||
:return: The info requested.
|
||
"""
|
||
if info == 'Candle History':
|
||
chart_view = kwargs.get('chart_view', {})
|
||
num_records = kwargs.get('num_records', 10)
|
||
symbol = chart_view.get('market')
|
||
timeframe = chart_view.get('timeframe')
|
||
exchange_name = chart_view.get('exchange')
|
||
user_name = kwargs.get('user_name')
|
||
|
||
if symbol and timeframe and exchange_name and user_name:
|
||
return self.candles.get_candle_history(num_records=num_records,
|
||
symbol=symbol,
|
||
interval=timeframe,
|
||
exchange_name=exchange_name,
|
||
user_name=user_name)
|
||
else:
|
||
missing_args = [arg for arg in ['symbol', 'timeframe', 'exchange', 'user_name'] if arg not in kwargs]
|
||
raise ValueError(f"Missing required arguments for 'Candle History': {', '.join(missing_args)}")
|
||
elif info == 'Something Else':
|
||
# Add code or action for 'Something Else'
|
||
pass
|
||
else:
|
||
raise ValueError(f"Unknown or missing argument for get_market_info(): {info}")
|
||
|
||
return None
|
||
|
||
def get_indicator_data(self, user_name: str, source: dict, start_ts: float = None, num_results: int = None) -> dict:
|
||
"""
|
||
Fetches indicator data for a specific user.
|
||
|
||
:param user_name: The name of the user making the request.
|
||
:param source: A dictionary containing values specific to the type of indicator.
|
||
:param start_ts: The optional timestamp to start fetching the data from.
|
||
:param num_results: The optional number of results requested.
|
||
:return: dict - A dictionary of timestamp indexed indicator data.
|
||
:raises: ValueError if user_name or source is invalid.
|
||
"""
|
||
if not user_name:
|
||
raise ValueError("Invalid user_name provided.")
|
||
|
||
if not source:
|
||
raise ValueError("Invalid source provided.")
|
||
|
||
# Additional validation checks for start_ts and num_results if needed
|
||
|
||
return self.indicators.get_indicator_data(user_name=user_name, source=source, start_ts=start_ts,
|
||
num_results=num_results)
|
||
|
||
def connect_user_to_exchange(self, user_name: str, default_exchange: str, default_keys: dict = None) -> bool:
|
||
"""
|
||
Connects an exchange if it is not already connected.
|
||
|
||
:param user_name: str - The user executing the action.
|
||
:param default_exchange: - The name of the default exchange to connect.
|
||
:param default_keys: default API keys.
|
||
:return: bool - True on success.
|
||
"""
|
||
active_exchanges = self.users.get_exchanges(user_name, category='active_exchanges')
|
||
|
||
success = False
|
||
for exchange in active_exchanges:
|
||
keys = self.users.get_api_keys(user_name, exchange)
|
||
result = self.connect_or_config_exchange(user_name=user_name,
|
||
exchange_name=exchange,
|
||
api_keys=keys)
|
||
if (result['status'] == 'success') or (result['status'] == 'already_connected'):
|
||
success = True
|
||
|
||
if not success:
|
||
# If no active exchange was successfully connected, connect to the default exchange
|
||
result = self.connect_or_config_exchange(user_name=user_name,
|
||
exchange_name=default_exchange,
|
||
api_keys=default_keys)
|
||
if result['status'] == 'success':
|
||
success = True
|
||
|
||
return success
|
||
|
||
def get_js_init_data(self, user_name: str) -> dict:
|
||
"""
|
||
Returns a JSON object of initialization data.
|
||
This is passed into the frontend HTML template for the javascript to access in the rendered HTML.
|
||
|
||
:param user_name: str - The name of the user making the query.
|
||
"""
|
||
chart_view = self.users.get_chart_view(user_name=user_name)
|
||
indicator_types = self.indicators.get_available_indicator_types()
|
||
available_indicators = self.indicators.get_indicator_list(user_name)
|
||
exchange = self.exchanges.get_exchange(ename=chart_view.get('exchange'), uname=user_name)
|
||
|
||
if not chart_view:
|
||
chart_view = {'timeframe': '', 'exchange_name': '', 'market': ''}
|
||
if not indicator_types:
|
||
indicator_types = []
|
||
if not available_indicators:
|
||
available_indicators = []
|
||
|
||
js_data = {
|
||
'i_types': indicator_types,
|
||
'indicators': available_indicators,
|
||
'timeframe': chart_view.get('timeframe'),
|
||
'exchange_name': chart_view.get('exchange_name'),
|
||
'trading_pair': chart_view.get('market'),
|
||
'user_name': user_name,
|
||
'public_exchanges': self.exchanges.get_public_exchanges(),
|
||
'intervals': exchange.intervals if exchange else [],
|
||
'symbols': exchange.get_symbols() if exchange else {}
|
||
|
||
}
|
||
return js_data
|
||
|
||
def get_rendered_data(self, user_name: str) -> dict:
|
||
"""
|
||
Returns data required to render the HTML template of the application's frontend.
|
||
|
||
:param user_name: The name of the user executing the request.
|
||
:return: A dictionary containing the requested data.
|
||
"""
|
||
|
||
chart_view = self.users.get_chart_view(user_name=user_name)
|
||
exchange = self.exchanges.get_exchange(ename=chart_view.get('exchange'), uname=user_name)
|
||
|
||
# noinspection PyDictCreation
|
||
r_data = {}
|
||
r_data['title'] = self.config.get_setting('application_title')
|
||
r_data['chart_interval'] = chart_view.get('timeframe', '')
|
||
r_data['selected_exchange'] = chart_view.get('exchange', '')
|
||
r_data['intervals'] = exchange.intervals if exchange else []
|
||
r_data['symbols'] = exchange.get_symbols() if exchange else {}
|
||
r_data['available_exchanges'] = self.exchanges.get_available_exchanges() or []
|
||
r_data['connected_exchanges'] = self.exchanges.get_connected_exchanges(user_name) or []
|
||
r_data['configured_exchanges'] = self.users.get_exchanges(
|
||
user_name, category='configured_exchanges') or []
|
||
r_data['my_balances'] = self.exchanges.get_all_balances(user_name) or {}
|
||
r_data['indicator_types'] = self.indicators.get_available_indicator_types() or []
|
||
r_data['indicator_list'] = self.indicators.get_indicator_list(user_name) or []
|
||
r_data['enabled_indicators'] = self.indicators.get_indicator_list(user_name, only_enabled=True) or []
|
||
r_data['ma_vals'] = self.indicators.MV_AVERAGE_ENUM
|
||
r_data['active_trades'] = self.exchanges.get_all_activated(user_name, fetch_type='trades') or {}
|
||
r_data['open_orders'] = self.exchanges.get_all_activated(user_name, fetch_type='orders') or {}
|
||
|
||
return r_data
|
||
|
||
def received_cdata(self, cdata: dict) -> dict | None:
|
||
"""
|
||
Processes the received candle data and updates the indicators, signals, trades, and strategies.
|
||
|
||
:param cdata: Dictionary containing the most recent price data.
|
||
:return: Dictionary of updates to be passed onto the UI or None if received duplicate data.
|
||
"""
|
||
# Return if this candle is the same as the last candle received, except when it's the first candle received.
|
||
if not self.candles.cached_last_candle:
|
||
self.candles.cached_last_candle = cdata
|
||
elif cdata['time'] == self.candles.cached_last_candle['time']:
|
||
return None
|
||
|
||
self.candles.set_new_candle(cdata)
|
||
# i_updates = self.indicators.update_indicators()
|
||
state_changes = self.signals.process_all_signals(self.indicators)
|
||
|
||
# Build price updates dict: trades.update expects {symbol: price}
|
||
symbol = cdata.get('symbol', cdata.get('market', 'BTC/USDT'))
|
||
price_updates = {symbol: float(cdata['close'])}
|
||
trade_updates = self.trades.update(price_updates)
|
||
|
||
# Update all active strategy instances with new candle data
|
||
stg_updates = self.strategies.update(candle_data=cdata)
|
||
|
||
updates = {}
|
||
# if i_updates:
|
||
# updates['i_updates'] = i_updates
|
||
if state_changes:
|
||
updates['s_updates'] = state_changes
|
||
if trade_updates:
|
||
updates['trade_updts'] = trade_updates
|
||
if stg_updates:
|
||
updates['stg_updts'] = stg_updates
|
||
|
||
# Log any errors from strategy execution
|
||
for event in stg_updates:
|
||
if event.get('type') == 'error':
|
||
logger.warning(f"Strategy error: {event.get('message')} "
|
||
f"(user={event.get('user_id')}, strategy={event.get('strategy_id')})")
|
||
|
||
return updates
|
||
|
||
def received_new_signal(self, data: dict, user_id: int = None) -> dict:
|
||
"""
|
||
Handles the creation of a new signal based on the provided data.
|
||
|
||
:param data: A dictionary containing the attributes of the new signal.
|
||
:param user_id: The ID of the user creating the signal.
|
||
:return: A dictionary containing success or failure information.
|
||
"""
|
||
# Validate required fields
|
||
required_fields = ['name', 'source1', 'prop1', 'source2', 'prop2', 'operator']
|
||
missing_fields = [field for field in required_fields if field not in data]
|
||
if missing_fields:
|
||
return {"success": False, "message": f"Missing fields: {', '.join(missing_fields)}"}
|
||
|
||
# Add creator field
|
||
if user_id is not None:
|
||
data['creator'] = user_id
|
||
|
||
# Save the signal
|
||
result = self.signals.new_signal(data)
|
||
return result
|
||
|
||
def received_edit_signal(self, data: dict, user_id: int = None) -> dict:
|
||
"""
|
||
Handles editing an existing signal based on the provided data.
|
||
|
||
:param data: A dictionary containing the attributes of the signal to edit.
|
||
:param user_id: The ID of the user editing the signal.
|
||
:return: A dictionary containing success or failure information.
|
||
"""
|
||
if not data.get('tbl_key'):
|
||
return {"success": False, "message": "Signal tbl_key not provided."}
|
||
|
||
# Verify user has permission to edit
|
||
if user_id is not None:
|
||
signal = self.signals.get_signal_by_tbl_key(data['tbl_key'])
|
||
if signal and signal.creator != user_id and not signal.public:
|
||
return {"success": False, "message": "You don't have permission to edit this signal."}
|
||
data['creator'] = user_id
|
||
|
||
result = self.signals.edit_signal(data)
|
||
return result
|
||
|
||
def received_new_strategy(self, data: dict) -> dict:
|
||
"""
|
||
Handles the creation of a new strategy based on the provided data.
|
||
|
||
:param data: A dictionary containing the attributes of the new strategy.
|
||
:return: A dictionary indicating success or failure with an appropriate message.
|
||
"""
|
||
# Validate presence of required fields
|
||
required_fields = ['user_name', 'name', 'workspace', 'code']
|
||
missing_fields = [field for field in required_fields if field not in data]
|
||
if missing_fields:
|
||
return {"success": False, "message": f"Missing fields: {', '.join(missing_fields)}"}
|
||
|
||
# Extract user_name and get user_id
|
||
user_name = data.get('user_name')
|
||
user_id = self.get_user_info(user_name=user_name, info='User_id')
|
||
if not user_id:
|
||
return {"success": False, "message": "User ID not found"}
|
||
|
||
# Validate data types and contents
|
||
if not isinstance(data['name'], str) or not data['name'].strip():
|
||
return {"success": False, "message": "Invalid or empty strategy name"}
|
||
if not isinstance(data['workspace'], str) or not data['workspace'].strip():
|
||
return {"success": False, "message": "Invalid or empty workspace data"}
|
||
if not isinstance(data['code'], (dict, str)) or not data['code']:
|
||
return {"success": False, "message": "Invalid or empty strategy code"}
|
||
|
||
try:
|
||
# Ensure 'code' is serialized as a JSON string
|
||
code_json = json.dumps(data['code']) if isinstance(data['code'], dict) else data['code']
|
||
except (TypeError, ValueError) as e:
|
||
return {"success": False, "message": f"Invalid strategy code: {str(e)}"}
|
||
|
||
# Prepare the strategy data for insertion
|
||
try:
|
||
strategy_data = {
|
||
"creator": int(user_id), # Convert numpy.int64 to native int for SQLite
|
||
"name": data['name'].strip(),
|
||
"workspace": data['workspace'].strip(),
|
||
"code": code_json,
|
||
"stats": data.get('stats', {}),
|
||
"public": int(data.get('public', 0)),
|
||
"fee": float(data.get('fee', 0.0))
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "message": f"Error preparing strategy data: {str(e)}"}
|
||
|
||
# The default source for undefined sources in the strategy
|
||
try:
|
||
default_source = self.users.get_chart_view(user_name=user_name)
|
||
except Exception as e:
|
||
return {"success": False, "message": f"Error fetching chart view: {str(e)}"}
|
||
|
||
# Save the new strategy (in both cache and database) and handle the result
|
||
try:
|
||
result = self.strategies.new_strategy(strategy_data, default_source)
|
||
if result.get("success"):
|
||
return {
|
||
"success": True,
|
||
"strategy": result.get("strategy"), # Strategy object without `strategy_components`
|
||
"updated_at": result.get("updated_at"),
|
||
"message": result.get("message", "Strategy created successfully")
|
||
}
|
||
else:
|
||
return {"success": False, "message": result.get("message", "Failed to create strategy")}
|
||
except Exception as e:
|
||
# Log unexpected exceptions for debugging
|
||
logger.error(f"Error creating new strategy: {e}", exc_info=True)
|
||
return {"success": False, "message": "An unexpected error occurred while creating the strategy"}
|
||
|
||
def received_edit_strategy(self, data: dict) -> dict:
|
||
"""
|
||
Handles editing an existing strategy based on the provided data.
|
||
|
||
:param data: A dictionary containing the attributes of the strategy to edit.
|
||
:return: A dictionary containing success or failure information.
|
||
"""
|
||
# Extract user_name and strategy name from the data
|
||
user_name = data.get('user_name')
|
||
strategy_name = data.get('name')
|
||
|
||
if not user_name:
|
||
return {"success": False, "message": "User not specified"}
|
||
if not strategy_name:
|
||
return {"success": False, "message": "Strategy name not specified"}
|
||
|
||
# Fetch the user_id using the user_name
|
||
user_id = self.get_user_info(user_name=user_name, info='User_id')
|
||
if not user_id:
|
||
return {"success": False, "message": "User ID not found"}
|
||
|
||
# Retrieve the tbl_key using user_id and strategy name
|
||
filter_conditions = [('creator', user_id), ('name', strategy_name)]
|
||
strategy_row = self.data.get_rows_from_datacache(
|
||
cache_name='strategies',
|
||
filter_vals=filter_conditions,
|
||
include_tbl_key=True # Include tbl_key in the result
|
||
)
|
||
|
||
if strategy_row.empty:
|
||
return {"success": False, "message": "Strategy not found"}
|
||
|
||
# Ensure only one strategy is found
|
||
if len(strategy_row) > 1:
|
||
return {"success": False, "message": "Multiple strategies found. Please provide more specific information."}
|
||
|
||
# Extract the tbl_key
|
||
tbl_key = strategy_row.iloc[0]['tbl_key']
|
||
|
||
# Prepare the updated strategy data
|
||
strategy_data = {
|
||
"creator": user_id,
|
||
"name": strategy_name,
|
||
"workspace": data.get('workspace'),
|
||
"code": data.get('code'),
|
||
"stats": data.get('stats', {}),
|
||
"public": data.get('public', 0),
|
||
"fee": data.get('fee', None),
|
||
"tbl_key": tbl_key # Include the tbl_key to identify the strategy
|
||
}
|
||
|
||
# Get the default source for undefined sources in the strategy
|
||
try:
|
||
default_source = self.users.get_chart_view(user_name=user_name)
|
||
except Exception as e:
|
||
return {"success": False, "message": f"Error fetching chart view: {str(e)}"}
|
||
|
||
# Call the edit_strategy method to update the strategy
|
||
try:
|
||
result = self.strategies.edit_strategy(strategy_data, default_source)
|
||
if result.get("success"):
|
||
return {
|
||
"success": True,
|
||
"strategy": result.get("strategy"), # Strategy object without `strategy_components`
|
||
"updated_at": result.get("updated_at"),
|
||
"message": result.get("message", "Strategy updated successfully")
|
||
}
|
||
else:
|
||
return {"success": False, "message": result.get("message", "Failed to update strategy")}
|
||
except Exception as e:
|
||
# Log unexpected exceptions
|
||
logger.error(f"Error editing strategy: {e}", exc_info=True)
|
||
return {"success": False, "message": "An unexpected error occurred while editing the strategy"}
|
||
|
||
def delete_strategy(self, data: dict) -> dict:
|
||
"""
|
||
Deletes the specified strategy identified by tbl_key from the strategies instance.
|
||
|
||
:param data: Dictionary containing 'tbl_key' and 'user_name'.
|
||
:return: A dictionary indicating success or failure with an appropriate message and the tbl_key.
|
||
"""
|
||
# Validate tbl_key
|
||
tbl_key = data.get('tbl_key')
|
||
if not tbl_key:
|
||
return {"success": False, "message": "tbl_key not provided", "tbl_key": None}
|
||
|
||
# Call the delete_strategy method to remove the strategy
|
||
result = self.strategies.delete_strategy(tbl_key=tbl_key)
|
||
|
||
# Return the result with tbl_key included
|
||
if result.get('success'):
|
||
return {
|
||
"success": True,
|
||
"message": result.get('message'),
|
||
"tbl_key": tbl_key # Include tbl_key in the response
|
||
}
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"message": result.get('message'),
|
||
"tbl_key": tbl_key # Include tbl_key even on failure for debugging
|
||
}
|
||
|
||
def start_strategy(
|
||
self,
|
||
user_id: int,
|
||
strategy_id: str,
|
||
mode: str,
|
||
initial_balance: float = 10000.0,
|
||
commission: float = 0.001,
|
||
exchange_name: str = None,
|
||
testnet: bool = True,
|
||
max_position_pct: float = 0.5,
|
||
circuit_breaker_pct: float = -0.10,
|
||
) -> dict:
|
||
"""
|
||
Start a strategy in the specified mode (paper or live).
|
||
|
||
:param user_id: User identifier.
|
||
:param strategy_id: Strategy tbl_key.
|
||
:param mode: Trading mode ('paper' or 'live').
|
||
:param initial_balance: Starting balance for paper trading.
|
||
:param commission: Commission rate.
|
||
:param exchange_name: Exchange name for live trading (required for live mode).
|
||
:param testnet: Use testnet for live trading (default True for safety).
|
||
:param max_position_pct: Maximum position size as % of balance for live trading.
|
||
:param circuit_breaker_pct: Drawdown % to halt trading for live trading.
|
||
:return: Dictionary with success status and details.
|
||
"""
|
||
from brokers import TradingMode
|
||
import uuid
|
||
import config
|
||
|
||
# Validate mode
|
||
if mode not in [TradingMode.PAPER, TradingMode.LIVE]:
|
||
return {"success": False, "message": f"Invalid mode '{mode}'. Use 'paper' or 'live'."}
|
||
|
||
# For live mode, we now use LiveStrategyInstance
|
||
effective_mode = mode
|
||
|
||
# Get the strategy data
|
||
strategy_data = self.strategies.data_cache.get_rows_from_datacache(
|
||
cache_name='strategies',
|
||
filter_vals=[('tbl_key', strategy_id)],
|
||
include_tbl_key=True
|
||
)
|
||
|
||
if strategy_data.empty:
|
||
return {"success": False, "message": "Strategy not found."}
|
||
|
||
strategy_row = strategy_data.iloc[0]
|
||
strategy_name = strategy_row.get('name', 'Unknown')
|
||
|
||
# Authorization check: user must own the strategy or strategy must be public
|
||
strategy_creator = strategy_row.get('creator')
|
||
is_public = bool(strategy_row.get('public', False))
|
||
|
||
if not is_public:
|
||
requester_name = None
|
||
try:
|
||
requester_name = self.users.get_username(user_id=user_id)
|
||
except Exception:
|
||
logger.warning(f"Unable to resolve username for user id '{user_id}'.")
|
||
|
||
creator_str = str(strategy_creator) if strategy_creator is not None else ''
|
||
requester_id_str = str(user_id)
|
||
|
||
creator_matches_user = False
|
||
if creator_str:
|
||
# Support creator being stored as user_name or user_id.
|
||
creator_matches_user = (
|
||
(requester_name is not None and creator_str == requester_name) or
|
||
(creator_str == requester_id_str)
|
||
)
|
||
|
||
if not creator_matches_user and creator_str:
|
||
# Also check if creator is a username that resolves to the current user id.
|
||
try:
|
||
creator_id = self.get_user_info(user_name=creator_str, info='User_id')
|
||
creator_matches_user = creator_id == user_id
|
||
except Exception:
|
||
creator_matches_user = False
|
||
|
||
if not creator_matches_user:
|
||
return {
|
||
"success": False,
|
||
"message": "You do not have permission to run this strategy."
|
||
}
|
||
|
||
# Check if already running
|
||
instance_key = (user_id, strategy_id, effective_mode)
|
||
if instance_key in self.strategies.active_instances:
|
||
return {
|
||
"success": False,
|
||
"message": f"Strategy '{strategy_name}' is already running in {effective_mode} mode."
|
||
}
|
||
|
||
# Get the generated code from strategy_components
|
||
try:
|
||
import json
|
||
components = json.loads(strategy_row.get('strategy_components', '{}'))
|
||
# Key is 'generated_code' not 'code' - matches PythonGenerator output
|
||
generated_code = components.get('generated_code', '')
|
||
if not generated_code:
|
||
return {"success": False, "message": "Strategy has no generated code."}
|
||
except (json.JSONDecodeError, TypeError) as e:
|
||
return {"success": False, "message": f"Invalid strategy components: {e}"}
|
||
|
||
# Create the strategy instance
|
||
try:
|
||
# For live mode, we need to get the exchange instance FIRST
|
||
# (before creating instance ID, to use resolved exchange name)
|
||
exchange = None
|
||
actual_testnet = testnet
|
||
resolved_exchange_name = exchange_name
|
||
|
||
if mode == TradingMode.LIVE:
|
||
# Get the user's username for exchange lookup
|
||
try:
|
||
user_name = self.users.get_username(user_id=user_id)
|
||
except Exception:
|
||
return {"success": False, "message": "Could not resolve username for exchange access."}
|
||
|
||
# Determine which exchange to use
|
||
if not resolved_exchange_name:
|
||
# Try to get the user's active exchange
|
||
active_exchanges = self.users.get_exchanges(user_name, category='active_exchanges')
|
||
if active_exchanges:
|
||
resolved_exchange_name = active_exchanges[0]
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"message": "No exchange specified and no active exchange found. Please configure an exchange."
|
||
}
|
||
|
||
# Determine actual testnet mode (config can override to force testnet)
|
||
if config.TESTNET_MODE:
|
||
actual_testnet = True
|
||
|
||
# Hard production gate using effective mode after config overrides.
|
||
if not actual_testnet and not config.ALLOW_LIVE_PRODUCTION:
|
||
logger.warning(
|
||
f"Production trading blocked: BRIGHTER_ALLOW_LIVE_PROD not set. "
|
||
f"User {user_id} attempted production trading."
|
||
)
|
||
return {
|
||
"success": False,
|
||
"message": "Production trading is disabled. Set BRIGHTER_ALLOW_LIVE_PROD=true to enable."
|
||
}
|
||
|
||
# Get the exchange instance (may not exist yet)
|
||
try:
|
||
exchange = self.exchanges.get_exchange(ename=resolved_exchange_name, uname=user_name)
|
||
except ValueError:
|
||
exchange = None # Exchange doesn't exist yet, will be created below
|
||
|
||
# CRITICAL: Verify exchange testnet mode matches requested mode
|
||
if exchange:
|
||
# Use bool() to normalize the comparison (handles mock objects)
|
||
exchange_is_testnet = bool(getattr(exchange, 'testnet', False))
|
||
if exchange_is_testnet != actual_testnet:
|
||
# Exchange mode mismatch - need to create new exchange with correct mode
|
||
logger.warning(
|
||
f"Exchange '{resolved_exchange_name}' is in "
|
||
f"{'testnet' if exchange_is_testnet else 'production'} mode, "
|
||
f"but requested {'testnet' if actual_testnet else 'production'}. "
|
||
f"Creating new exchange connection."
|
||
)
|
||
# Get API keys and reconnect with correct mode
|
||
api_keys = self.users.get_api_keys(user_name, resolved_exchange_name)
|
||
self.exchanges.connect_exchange(
|
||
exchange_name=resolved_exchange_name,
|
||
user_name=user_name,
|
||
api_keys=api_keys,
|
||
testnet=actual_testnet
|
||
)
|
||
exchange = self.exchanges.get_exchange(ename=resolved_exchange_name, uname=user_name)
|
||
|
||
# If exchange doesn't exist or isn't configured, try to load API keys from database
|
||
if not exchange or not exchange.configured:
|
||
logger.info(f"Exchange '{resolved_exchange_name}' not configured, loading API keys from database...")
|
||
api_keys = self.users.get_api_keys(user_name, resolved_exchange_name)
|
||
if api_keys:
|
||
logger.info(f"Found API keys for {resolved_exchange_name}, reconnecting with testnet={actual_testnet}...")
|
||
success = self.exchanges.connect_exchange(
|
||
exchange_name=resolved_exchange_name,
|
||
user_name=user_name,
|
||
api_keys=api_keys,
|
||
testnet=actual_testnet
|
||
)
|
||
if success:
|
||
exchange = self.exchanges.get_exchange(ename=resolved_exchange_name, uname=user_name)
|
||
logger.info(f"Reconnected exchange: configured={exchange.configured}, testnet={exchange.testnet}")
|
||
else:
|
||
logger.error(f"Failed to reconnect exchange '{resolved_exchange_name}'")
|
||
else:
|
||
logger.warning(f"No API keys found in database for {user_name}/{resolved_exchange_name}")
|
||
|
||
# Check again after attempting to load keys
|
||
if not exchange or not exchange.configured:
|
||
return {
|
||
"success": False,
|
||
"message": f"Exchange '{resolved_exchange_name}' is not configured with valid API keys. "
|
||
f"Please configure your API keys in the exchange settings."
|
||
}
|
||
|
||
# Final verification: exchange mode MUST match requested mode
|
||
exchange_is_testnet = bool(getattr(exchange, 'testnet', False))
|
||
if exchange_is_testnet != actual_testnet:
|
||
return {
|
||
"success": False,
|
||
"message": f"Exchange mode mismatch: exchange is {'testnet' if exchange_is_testnet else 'production'}, "
|
||
f"but requested {'testnet' if actual_testnet else 'production'}."
|
||
}
|
||
|
||
# Safety warning for production mode
|
||
if not actual_testnet:
|
||
logger.warning(
|
||
f"Starting LIVE PRODUCTION strategy '{strategy_name}' for user {user_id} "
|
||
f"on exchange '{resolved_exchange_name}'. Real money will be used!"
|
||
)
|
||
|
||
# Create deterministic instance ID for live mode AFTER exchange resolution
|
||
# (enables restart-safe state recovery with correct exchange name)
|
||
if mode == TradingMode.LIVE:
|
||
# Use resolved exchange name (not 'default')
|
||
testnet_suffix = 'testnet' if actual_testnet else 'prod'
|
||
strategy_instance_id = f"live:{user_id}:{strategy_id}:{resolved_exchange_name}:{testnet_suffix}"
|
||
else:
|
||
# Paper mode: random UUID since paper state is ephemeral
|
||
strategy_instance_id = str(uuid.uuid4())
|
||
|
||
instance = self.strategies.create_strategy_instance(
|
||
mode=mode,
|
||
strategy_instance_id=strategy_instance_id,
|
||
strategy_id=strategy_id,
|
||
strategy_name=strategy_name,
|
||
user_id=user_id,
|
||
generated_code=generated_code,
|
||
initial_balance=initial_balance,
|
||
commission=commission,
|
||
price_provider=lambda symbol: self.exchanges.get_price(symbol),
|
||
exchange=exchange,
|
||
testnet=actual_testnet,
|
||
max_position_pct=max_position_pct,
|
||
circuit_breaker_pct=circuit_breaker_pct,
|
||
)
|
||
|
||
# Store the active instance
|
||
self.strategies.active_instances[instance_key] = instance
|
||
|
||
logger.info(f"Started strategy '{strategy_name}' for user {user_id} in {mode} mode")
|
||
|
||
result = {
|
||
"success": True,
|
||
"message": f"Strategy '{strategy_name}' started in {mode} mode.",
|
||
"strategy_id": strategy_id,
|
||
"strategy_name": strategy_name,
|
||
"instance_id": strategy_instance_id,
|
||
"mode": mode,
|
||
"actual_mode": effective_mode,
|
||
"initial_balance": initial_balance,
|
||
}
|
||
|
||
# Add live-specific info
|
||
if mode == TradingMode.LIVE:
|
||
result["exchange"] = resolved_exchange_name
|
||
result["testnet"] = actual_testnet
|
||
result["max_position_pct"] = max_position_pct
|
||
result["circuit_breaker_pct"] = circuit_breaker_pct
|
||
if actual_testnet:
|
||
result["warning"] = "Running in TESTNET mode. No real money at risk."
|
||
else:
|
||
result["warning"] = "PRODUCTION MODE: Real money is at risk!"
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to create strategy instance: {e}", exc_info=True)
|
||
return {"success": False, "message": f"Failed to start strategy: {str(e)}"}
|
||
|
||
def stop_strategy(
|
||
self,
|
||
user_id: int,
|
||
strategy_id: str,
|
||
mode: str,
|
||
) -> dict:
|
||
"""
|
||
Stop a running strategy.
|
||
|
||
:param user_id: User identifier.
|
||
:param strategy_id: Strategy tbl_key.
|
||
:param mode: Trading mode.
|
||
:return: Dictionary with success status.
|
||
"""
|
||
from brokers import TradingMode
|
||
|
||
instance_key = (user_id, strategy_id, mode)
|
||
instance = self.strategies.active_instances.get(instance_key)
|
||
|
||
# Compatibility for live mode fallback.
|
||
if instance is None and mode == TradingMode.LIVE:
|
||
fallback_key = (user_id, strategy_id, TradingMode.PAPER)
|
||
instance = self.strategies.active_instances.get(fallback_key)
|
||
if instance is not None:
|
||
instance_key = fallback_key
|
||
|
||
if instance is None:
|
||
return {
|
||
"success": False,
|
||
"message": f"No running strategy found for this user/strategy/mode combination."
|
||
}
|
||
|
||
self.strategies.active_instances.pop(instance_key, None)
|
||
actual_mode = instance_key[2]
|
||
strategy_name = instance.strategy_name
|
||
|
||
# Get final stats if available
|
||
final_stats = {}
|
||
if hasattr(instance, 'broker') and hasattr(instance.broker, 'get_balance'):
|
||
final_stats['final_balance'] = instance.broker.get_balance()
|
||
final_stats['available_balance'] = instance.broker.get_available_balance()
|
||
|
||
if hasattr(instance, 'trade_history'):
|
||
final_stats['total_trades'] = len(instance.trade_history)
|
||
|
||
logger.info(f"Stopped strategy '{strategy_name}' for user {user_id} in {mode} mode")
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"Strategy '{strategy_name}' stopped.",
|
||
"strategy_id": strategy_id,
|
||
"strategy_name": strategy_name,
|
||
"mode": mode,
|
||
"actual_mode": actual_mode,
|
||
"final_stats": final_stats,
|
||
}
|
||
|
||
def get_strategy_status(
|
||
self,
|
||
user_id: int,
|
||
strategy_id: str = None,
|
||
mode: str = None,
|
||
) -> dict:
|
||
"""
|
||
Get the status of running strategies for a user.
|
||
|
||
:param user_id: User identifier.
|
||
:param strategy_id: Optional strategy ID to filter.
|
||
:param mode: Optional mode to filter.
|
||
:return: Dictionary with strategy statuses.
|
||
"""
|
||
running_strategies = []
|
||
|
||
for (uid, sid, m), instance in self.strategies.active_instances.items():
|
||
if uid != user_id:
|
||
continue
|
||
if strategy_id and sid != strategy_id:
|
||
continue
|
||
if mode and m != mode:
|
||
continue
|
||
|
||
status = {
|
||
"strategy_id": sid,
|
||
"strategy_name": instance.strategy_name,
|
||
"mode": m,
|
||
"instance_id": instance.strategy_instance_id,
|
||
}
|
||
|
||
# Add broker stats if available
|
||
if hasattr(instance, 'broker'):
|
||
status['balance'] = instance.broker.get_balance()
|
||
status['available_balance'] = instance.broker.get_available_balance()
|
||
|
||
# Get positions
|
||
if hasattr(instance.broker, 'get_all_positions'):
|
||
positions = instance.broker.get_all_positions()
|
||
status['positions'] = [
|
||
{
|
||
'symbol': p.symbol,
|
||
'size': p.size,
|
||
'entry_price': p.entry_price,
|
||
'unrealized_pnl': p.unrealized_pnl,
|
||
}
|
||
for p in positions
|
||
]
|
||
|
||
if hasattr(instance, 'trade_history'):
|
||
status['trade_count'] = len(instance.trade_history)
|
||
|
||
# Live-specific status
|
||
if hasattr(instance, 'is_testnet'):
|
||
status['testnet'] = instance.is_testnet
|
||
if hasattr(instance, 'circuit_breaker_status'):
|
||
status['circuit_breaker'] = instance.circuit_breaker_status
|
||
|
||
running_strategies.append(status)
|
||
|
||
return {
|
||
"success": True,
|
||
"running_strategies": running_strategies,
|
||
"count": len(running_strategies),
|
||
}
|
||
|
||
def delete_signal(self, data: dict, user_id: int = None) -> dict:
|
||
"""
|
||
Deletes a signal from the signals instance.
|
||
|
||
:param data: Dictionary containing 'tbl_key' or 'name' of the signal to delete.
|
||
:param user_id: The ID of the user deleting the signal (for permission check).
|
||
:return: A dictionary indicating success or failure.
|
||
"""
|
||
tbl_key = data.get('tbl_key')
|
||
signal_name = data.get('name')
|
||
|
||
# If only name provided, find the tbl_key
|
||
if not tbl_key and signal_name:
|
||
signal = self.signals.get_signal_by_name(signal_name)
|
||
if signal:
|
||
tbl_key = signal.tbl_key
|
||
|
||
if not tbl_key:
|
||
return {"success": False, "message": "Signal not found.", "tbl_key": None}
|
||
|
||
# Verify user has permission to delete
|
||
if user_id is not None:
|
||
signal = self.signals.get_signal_by_tbl_key(tbl_key)
|
||
if signal and signal.creator != user_id:
|
||
return {"success": False, "message": "You don't have permission to delete this signal."}
|
||
|
||
# Delete the signal
|
||
return self.signals.delete_signal(tbl_key)
|
||
|
||
def get_signals_json(self, user_id: int = None) -> list:
|
||
"""
|
||
Retrieve signals visible to the user (their own + public signals) and return as a list.
|
||
|
||
:param user_id: The ID of the user making the request.
|
||
:return: list - A list of signal dictionaries.
|
||
"""
|
||
return self.signals.get_all_signals(user_id, 'dict')
|
||
|
||
def get_strategies_json(self, user_id) -> list:
|
||
"""
|
||
Retrieve all public and user strategies from the strategies instance and return them as a list of dictionaries.
|
||
|
||
:return: list - A list of dictionaries, each representing a strategy.
|
||
"""
|
||
return self.strategies.get_all_strategies(user_id, 'dict')
|
||
|
||
def connect_or_config_exchange(self, user_name: str, exchange_name: str, api_keys: dict = None) -> dict:
|
||
"""
|
||
Connects to an exchange if not already connected, or configures the exchange connection for a single user.
|
||
|
||
:param user_name: str - The name of the user.
|
||
:param exchange_name: str - The name of the exchange.
|
||
:param api_keys: dict - The API keys for the exchange.
|
||
:return: dict - A dictionary containing the result of the operation.
|
||
"""
|
||
|
||
result = {
|
||
'exchange': exchange_name,
|
||
'status': '',
|
||
'message': ''
|
||
}
|
||
|
||
# If no API keys provided, try to load from database
|
||
if not api_keys:
|
||
api_keys = self.users.get_api_keys(user_name, exchange_name)
|
||
|
||
try:
|
||
if self.data.get_serialized_datacache(cache_name='exchange_data',
|
||
filter_vals=([('user', user_name), ('name', exchange_name)])).empty:
|
||
# Exchange is not connected, try to connect
|
||
success = self.exchanges.connect_exchange(exchange_name=exchange_name, user_name=user_name,
|
||
api_keys=api_keys)
|
||
if success:
|
||
self.users.active_exchange(exchange=exchange_name, user_name=user_name, cmd='set')
|
||
if api_keys:
|
||
self.users.update_api_keys(api_keys=api_keys, exchange=exchange_name, user_name=user_name)
|
||
result['status'] = 'success'
|
||
result['message'] = f'Successfully connected to {exchange_name}.'
|
||
else:
|
||
result['status'] = 'failure'
|
||
result['message'] = f'Failed to connect to {exchange_name}.'
|
||
else:
|
||
# Exchange is already connected, check if API keys need updating
|
||
if api_keys:
|
||
# Get current API keys
|
||
current_keys = self.users.get_api_keys(user_name, exchange_name)
|
||
|
||
# Compare current keys with provided keys
|
||
if current_keys != api_keys:
|
||
self.users.update_api_keys(api_keys=api_keys, exchange=exchange_name, user_name=user_name)
|
||
result['message'] = f'{exchange_name}: API keys updated.'
|
||
else:
|
||
result['message'] = f'{exchange_name}: API keys unchanged.'
|
||
|
||
result['status'] = 'already_connected'
|
||
except Exception as e:
|
||
result['status'] = 'error'
|
||
result['message'] = f"Failed to connect to {exchange_name} for user '{user_name}': {str(e)}"
|
||
|
||
return result
|
||
|
||
def close_trade(self, trade_id):
|
||
"""
|
||
Closes a trade identified by the given trade ID.
|
||
|
||
:param trade_id: The ID of the trade to be closed.
|
||
"""
|
||
if self.trades.is_valid_trade_id(trade_id):
|
||
pass
|
||
# self.trades.close_trade(trade_id)TODO
|
||
# self.config.remove('trades', trade_id)
|
||
print(f"Trade {trade_id} has been closed.")
|
||
else:
|
||
print(f"Invalid trade ID: {trade_id}. Unable to close the trade.")
|
||
|
||
def received_new_trade(self, data: dict) -> dict | None:
|
||
"""
|
||
Called when a new trade has been defined and created in the UI.
|
||
|
||
:param data: A dictionary containing the attributes of the trade.
|
||
:return: The details of the trade as a dictionary, or None on failure.
|
||
"""
|
||
|
||
def vld(attr):
|
||
"""
|
||
Casts numeric strings to float before returning the attribute.
|
||
Returns None if the attribute is absent in the data.
|
||
"""
|
||
if attr in data and data[attr] != '':
|
||
try:
|
||
return float(data[attr])
|
||
except ValueError:
|
||
return data[attr]
|
||
else:
|
||
return None
|
||
|
||
# Forward the request to trades.
|
||
status, result = self.trades.new_trade(target=vld('exchange_name'), symbol=vld('symbol'), price=vld('price'),
|
||
side=vld('side'), order_type=vld('orderType'),
|
||
qty=vld('quantity'))
|
||
if status == 'Error':
|
||
print(f'Error placing the trade: {result}')
|
||
return None
|
||
|
||
print(f'Trade order received: exchange_name={vld("exchange_name")}, '
|
||
f'symbol={vld("symbol")}, '
|
||
f'side={vld("side")}, '
|
||
f'type={vld("orderType")}, '
|
||
f'quantity={vld("quantity")}, '
|
||
f'price={vld("price")}')
|
||
|
||
# Update config's list of trades and save to file.TODO
|
||
# self.config.update_data('trades', self.trades.get_trades('dict'))
|
||
|
||
trade_obj = self.trades.get_trade_by_id(result)
|
||
if trade_obj:
|
||
# Return the trade object that was created in a form that can be converted to json.
|
||
return trade_obj.__dict__
|
||
else:
|
||
return None
|
||
|
||
def get_trades(self):
|
||
""" Return a JSON object of all the trades in the trades instance."""
|
||
return self.trades.get_trades('dict')
|
||
|
||
def delete_backtest(self, msg_data):
|
||
""" Delete an existing backtest by interacting with the Backtester. """
|
||
backtest_name = msg_data.get('name')
|
||
user_name = self.resolve_user_name(msg_data)
|
||
|
||
if not backtest_name or not user_name:
|
||
return {"success": False, "message": "Missing backtest name or user name."}
|
||
|
||
# Construct the backtest_key based on Backtester’s naming convention
|
||
backtest_key = f"backtest:{user_name}:{backtest_name}"
|
||
|
||
try:
|
||
# Delegate the deletion to the Backtester
|
||
self.backtester.remove_backtest(backtest_key)
|
||
return {"success": True, "message": f"Backtest '{backtest_name}' deleted successfully.",
|
||
"name": backtest_name}
|
||
except KeyError:
|
||
return {"success": False, "message": f"Backtest '{backtest_name}' not found."}
|
||
except Exception as e:
|
||
return {"success": False, "message": f"Error deleting backtest: {str(e)}"}
|
||
|
||
def adjust_setting(self, user_name: str, setting: str, params: Any):
|
||
"""
|
||
Adjusts the specified setting for a user.
|
||
|
||
:param user_name: The name of the user.
|
||
:param setting: The setting to adjust.
|
||
:param params: The parameters for the setting adjustment.
|
||
:raises ValueError: If the provided setting is not supported.
|
||
"""
|
||
print(f"[SETTINGS()] MODIFYING({user_name, setting})")
|
||
|
||
if setting == 'interval':
|
||
interval_state = params['timeframe']
|
||
self.users.set_chart_view(values=interval_state, specific_property='timeframe', user_name=user_name)
|
||
|
||
elif setting == 'trading_pair':
|
||
trading_pair = params['symbol']
|
||
self.users.set_chart_view(values=trading_pair, specific_property='market', user_name=user_name)
|
||
|
||
elif setting == 'exchange':
|
||
exchange_name = params['exchange_name']
|
||
|
||
# Get the list of available symbols (markets) for the specified exchange and user.
|
||
markets = self.exchanges.get_exchange(ename=exchange_name, uname=user_name).get_symbols()
|
||
|
||
# Check if the markets list is empty
|
||
if not markets:
|
||
# If no markets are available, exit without changing the chart view.
|
||
print(f"No available markets found for exchange '{exchange_name}'. Chart view remains unchanged.")
|
||
return
|
||
|
||
# Get the currently viewed market for the user.
|
||
current_symbol = self.users.get_chart_view(user_name=user_name, prop='market')
|
||
|
||
# Determine the market to display based on availability.
|
||
if current_symbol not in markets:
|
||
# If the current market is not available, default to the first available market.
|
||
market = markets[0]
|
||
else:
|
||
# Otherwise, continue displaying the current market.
|
||
market = current_symbol
|
||
|
||
# Update the user's chart view to reflect the new exchange and default market.
|
||
self.users.set_chart_view(values=exchange_name, specific_property='exchange_name',
|
||
user_name=user_name, default_market=market)
|
||
|
||
elif setting == 'toggle_indicator':
|
||
# Get indicator names - can be a list (from form checkboxes) or JSON string
|
||
indicator_param = params.get('indicator', [])
|
||
if isinstance(indicator_param, list):
|
||
indicators_to_toggle = indicator_param
|
||
elif isinstance(indicator_param, str):
|
||
# Try to parse as JSON for backwards compatibility
|
||
try:
|
||
indicators_to_toggle = json.loads(indicator_param)
|
||
except json.JSONDecodeError:
|
||
# If not JSON, treat as single indicator name
|
||
indicators_to_toggle = [indicator_param] if indicator_param else []
|
||
else:
|
||
indicators_to_toggle = []
|
||
|
||
user_id = self.get_user_info(user_name=user_name, info='User_id')
|
||
self.indicators.toggle_indicators(user_id=user_id, indicator_names=indicators_to_toggle)
|
||
|
||
elif setting == 'edit_indicator':
|
||
self.indicators.edit_indicator(user_name=user_name, params=params)
|
||
|
||
elif setting == 'delete_indicator':
|
||
self.indicators.delete_indicator(user_name=user_name, indicator_name=params)
|
||
|
||
elif setting == 'new_indicator':
|
||
self.indicators.new_indicator(user_name=user_name, params=params)
|
||
|
||
else:
|
||
print(f'ERROR SETTING VALUE')
|
||
print(f'The string received by the server was: /n{params}')
|
||
|
||
return
|
||
|
||
def process_incoming_message(self, msg_type: str, msg_data: dict, socket_conn_id: str) -> dict | None:
|
||
|
||
"""
|
||
Processes an incoming message and performs the corresponding actions based on the message type and data.
|
||
|
||
:param socket_conn_id: The WebSocket connection to send updates back to the client.
|
||
:param msg_type: The type of the incoming message.
|
||
:param msg_data: The data associated with the incoming message.
|
||
:return: dict|None - A dictionary containing the response message and data, or None if no response is needed or
|
||
no data is found to ensure the WebSocket channel isn't burdened with unnecessary
|
||
communication.
|
||
"""
|
||
|
||
def standard_reply(reply_msg: str, reply_data: Any) -> dict:
|
||
""" Formats a standard reply message. """
|
||
return {"reply": reply_msg, "data": reply_data}
|
||
|
||
user_name = self.resolve_user_name(msg_data)
|
||
user_id = self.resolve_user_id(msg_data, user_name=user_name)
|
||
|
||
if user_name:
|
||
msg_data.setdefault('user_name', user_name)
|
||
msg_data.setdefault('user', user_name)
|
||
if user_id is not None:
|
||
msg_data.setdefault('user_id', user_id)
|
||
msg_data.setdefault('userId', user_id)
|
||
|
||
if msg_type == 'candle_data':
|
||
if r_data := self.received_cdata(msg_data):
|
||
return standard_reply("updates", r_data)
|
||
|
||
if msg_type == 'request':
|
||
request_for = msg_data.get('request')
|
||
if request_for == 'signals':
|
||
signals = self.get_signals_json(user_id)
|
||
return standard_reply("signals", signals if signals else [])
|
||
|
||
elif request_for == 'strategies':
|
||
if user_id is None:
|
||
return standard_reply("strategy_error", {"message": "User not specified"})
|
||
if strategies := self.get_strategies_json(user_id):
|
||
return standard_reply("strategies", strategies)
|
||
|
||
elif request_for == 'trades':
|
||
if trades := self.get_trades():
|
||
return standard_reply("trades", trades)
|
||
else:
|
||
print('Warning: Unhandled request!')
|
||
print(msg_data)
|
||
|
||
# Processing commands
|
||
if msg_type == 'delete_signal':
|
||
result = self.delete_signal(msg_data, user_id)
|
||
if result.get('success'):
|
||
return standard_reply("signal_deleted", {
|
||
"message": result.get('message'),
|
||
"tbl_key": result.get('tbl_key'),
|
||
"name": result.get('name')
|
||
})
|
||
else:
|
||
return standard_reply("signal_error", {
|
||
"message": result.get('message'),
|
||
"tbl_key": result.get('tbl_key')
|
||
})
|
||
|
||
if msg_type == 'delete_strategy':
|
||
result = self.delete_strategy(msg_data)
|
||
if result.get('success'):
|
||
return standard_reply("strategy_deleted", {
|
||
"message": result.get('message'),
|
||
"tbl_key": result.get('tbl_key') # Include tbl_key in the response
|
||
})
|
||
else:
|
||
return standard_reply("strategy_error", {
|
||
"message": result.get('message'),
|
||
"tbl_key": result.get('tbl_key') # Include tbl_key for debugging purposes
|
||
})
|
||
|
||
if msg_type == 'close_trade':
|
||
self.close_trade(msg_data)
|
||
|
||
if msg_type == 'new_signal':
|
||
result = self.received_new_signal(msg_data, user_id)
|
||
if result.get('success'):
|
||
return standard_reply("signal_created", result)
|
||
else:
|
||
return standard_reply("signal_error", result)
|
||
|
||
if msg_type == 'edit_signal':
|
||
result = self.received_edit_signal(msg_data, user_id)
|
||
if result.get('success'):
|
||
return standard_reply("signal_updated", result)
|
||
else:
|
||
return standard_reply("signal_error", result)
|
||
|
||
if msg_type == 'new_strategy':
|
||
try:
|
||
if r_data := self.received_new_strategy(msg_data):
|
||
return standard_reply("strategy_created", r_data)
|
||
except Exception as e:
|
||
logger.error(f"Error processing new_strategy: {e}", exc_info=True)
|
||
return standard_reply("strategy_error", {"message": "Failed to create strategy."})
|
||
|
||
if msg_type == 'edit_strategy':
|
||
try:
|
||
if r_data := self.received_edit_strategy(msg_data):
|
||
return standard_reply("strategy_updated", r_data)
|
||
except Exception as e:
|
||
# Log the error for debugging
|
||
logger.error(f"Error processing edit_strategy: {e}", exc_info=True)
|
||
return standard_reply("strategy_error", {"message": "Failed to edit strategy."})
|
||
|
||
if msg_type == 'new_trade':
|
||
if r_data := self.received_new_trade(msg_data):
|
||
return standard_reply("trade_created", r_data)
|
||
|
||
if msg_type == 'config_exchange':
|
||
user = msg_data.get('user') or user_name
|
||
exchange = msg_data.get('exch') or msg_data.get('exchange') or msg_data.get('exchange_name')
|
||
keys = msg_data.get('keys')
|
||
if not user or not exchange:
|
||
return standard_reply("Exchange_connection_result", {
|
||
"exchange": exchange or '',
|
||
"status": "error",
|
||
"message": "Missing user or exchange in request."
|
||
})
|
||
r_data = self.connect_or_config_exchange(user_name=user, exchange_name=exchange, api_keys=keys)
|
||
return standard_reply("Exchange_connection_result", r_data)
|
||
|
||
# Handle backtest operations
|
||
if msg_type == 'submit_backtest':
|
||
# Validate required fields
|
||
required_fields = ['strategy', 'start_date', 'capital', 'commission']
|
||
if not all(field in msg_data for field in required_fields):
|
||
return standard_reply("backtest_error", {"message": "Missing required fields."})
|
||
if not user_name:
|
||
return standard_reply("backtest_error", {"message": "Missing user identity."})
|
||
|
||
try:
|
||
# Delegate backtest handling to the Backtester
|
||
resp = self.backtester.handle_backtest_message(
|
||
user_id=user_id if user_id is not None else self.get_user_info(user_name=user_name, info='User_id'),
|
||
msg_data=msg_data,
|
||
socket_conn_id=socket_conn_id
|
||
)
|
||
|
||
if 'error' in resp:
|
||
# If there's an error, send a backtest_error message
|
||
return standard_reply("backtest_error", {"message": resp['error']})
|
||
else:
|
||
# If successful, send a backtest_submitted message
|
||
return standard_reply("backtest_submitted", resp)
|
||
|
||
except Exception as e:
|
||
# Catch any unexpected exceptions and send a backtest_error message
|
||
logger.error(f"Unhandled exception during backtest submission: {e}", exc_info=True)
|
||
return standard_reply("backtest_error",
|
||
{"message": "An unexpected error occurred during backtest submission."})
|
||
|
||
if msg_type == 'delete_backtest':
|
||
response = self.delete_backtest(msg_data)
|
||
return standard_reply("backtest_deleted", response)
|
||
|
||
if msg_type == 'run_strategy':
|
||
# Run a strategy in paper or live mode
|
||
required_fields = ['strategy_id', 'mode']
|
||
if not all(field in msg_data for field in required_fields):
|
||
return standard_reply("strategy_run_error", {"message": "Missing required fields (strategy_id, mode)."})
|
||
|
||
strategy_id = msg_data.get('strategy_id')
|
||
mode = msg_data.get('mode', 'paper').lower()
|
||
|
||
try:
|
||
# Parse numeric values safely inside try block
|
||
initial_balance = float(msg_data.get('initial_balance', 10000.0))
|
||
commission = float(msg_data.get('commission', 0.001))
|
||
|
||
# Live trading specific parameters
|
||
exchange_name = msg_data.get('exchange_name') or msg_data.get('exchange')
|
||
testnet = msg_data.get('testnet', True)
|
||
if isinstance(testnet, str):
|
||
testnet = testnet.lower() == 'true'
|
||
max_position_pct = float(msg_data.get('max_position_pct', 0.5))
|
||
circuit_breaker_pct = float(msg_data.get('circuit_breaker_pct', -0.10))
|
||
|
||
# Validate numeric ranges
|
||
if initial_balance <= 0:
|
||
return standard_reply("strategy_run_error", {"message": "Initial balance must be positive."})
|
||
if commission < 0 or commission > 1:
|
||
return standard_reply("strategy_run_error", {"message": "Commission must be between 0 and 1."})
|
||
if not (0 < max_position_pct <= 1):
|
||
return standard_reply("strategy_run_error", {"message": "max_position_pct must be between 0 and 1."})
|
||
if circuit_breaker_pct >= 0:
|
||
return standard_reply("strategy_run_error", {"message": "circuit_breaker_pct must be negative (e.g., -0.10 for -10%)."})
|
||
|
||
result = self.start_strategy(
|
||
user_id=user_id,
|
||
strategy_id=strategy_id,
|
||
mode=mode,
|
||
initial_balance=initial_balance,
|
||
commission=commission,
|
||
exchange_name=exchange_name,
|
||
testnet=testnet,
|
||
max_position_pct=max_position_pct,
|
||
circuit_breaker_pct=circuit_breaker_pct,
|
||
)
|
||
|
||
if result.get('success'):
|
||
return standard_reply("strategy_started", result)
|
||
else:
|
||
return standard_reply("strategy_run_error", result)
|
||
|
||
except ValueError as e:
|
||
return standard_reply("strategy_run_error", {"message": f"Invalid numeric value: {str(e)}"})
|
||
except Exception as e:
|
||
logger.error(f"Error starting strategy: {e}", exc_info=True)
|
||
return standard_reply("strategy_run_error", {"message": f"Failed to start strategy: {str(e)}"})
|
||
|
||
if msg_type == 'stop_strategy':
|
||
strategy_id = msg_data.get('strategy_id')
|
||
mode = msg_data.get('mode', 'paper').lower()
|
||
|
||
if not strategy_id:
|
||
return standard_reply("strategy_stop_error", {"message": "Missing strategy_id."})
|
||
|
||
try:
|
||
result = self.stop_strategy(
|
||
user_id=user_id,
|
||
strategy_id=strategy_id,
|
||
mode=mode,
|
||
)
|
||
if result.get('success'):
|
||
return standard_reply("strategy_stopped", result)
|
||
else:
|
||
return standard_reply("strategy_stop_error", result)
|
||
except Exception as e:
|
||
logger.error(f"Error stopping strategy: {e}", exc_info=True)
|
||
return standard_reply("strategy_stop_error", {"message": f"Failed to stop strategy: {str(e)}"})
|
||
|
||
if msg_type == 'get_strategy_status':
|
||
strategy_id = msg_data.get('strategy_id')
|
||
mode = msg_data.get('mode')
|
||
|
||
try:
|
||
result = self.get_strategy_status(
|
||
user_id=user_id,
|
||
strategy_id=strategy_id,
|
||
mode=mode,
|
||
)
|
||
return standard_reply("strategy_status", result)
|
||
except Exception as e:
|
||
logger.error(f"Error getting strategy status: {e}", exc_info=True)
|
||
return standard_reply("strategy_status_error", {"message": f"Failed to get status: {str(e)}"})
|
||
|
||
if msg_type == 'reply':
|
||
# If the message is a reply log the response to the terminal.
|
||
print(f"\napp.py:Received reply: {msg_data}")
|