brighter-trading/src/BrighterTrades.py

1271 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Any
from Users import Users
from DataCache_v3 import DataCache
from Strategies import Strategies
from backtesting import Backtester
from candles import Candles
from Configuration import Configuration
from ExchangeInterface import ExchangeInterface
from indicators import Indicators
from Signals import Signals
from trade import Trades
# Configure logging
logger = logging.getLogger(__name__)
class BrighterTrades:
def __init__(self, socketio):
# Object that interacts with the persistent data.
self.data = DataCache()
# Object that interacts and maintains exchange_interface and account data
self.exchanges = ExchangeInterface(self.data)
# Set the exchange for datacache to use
self.data.set_exchange(self.exchanges)
# Configuration for the app
self.config = Configuration()
# The object that manages users in the system.
self.users = Users(data_cache=self.data)
# Object that maintains signals.
self.signals = Signals(self.config)
# Object that maintains candlestick and price data.
self.candles = Candles(users=self.users, exchanges=self.exchanges, datacache=self.data,
config=self.config)
# Object that interacts with and maintains data from available indicators
self.indicators = Indicators(self.candles, self.users, self.data)
# Object that maintains the trades data
self.trades = Trades(self.users)
# The Trades object needs to connect to an exchange_interface.
self.trades.connect_exchanges(exchanges=self.exchanges)
# Object that maintains the strategies data
self.strategies = Strategies(self.data, self.trades, self.indicators)
# Object responsible for testing trade and strategies data.
self.backtester = Backtester(data_cache=self.data, strategies=self.strategies,
indicators=self.indicators, socketio=socketio)
self.backtests = {} # In-memory storage for backtests (replace with DB access in production)
@staticmethod
def _coerce_user_id(user_id: Any) -> int | None:
if user_id is None or user_id == '':
return None
try:
return int(user_id)
except (TypeError, ValueError):
return None
def resolve_user_name(self, msg_data: dict | None) -> str | None:
"""
Resolve a username from payload fields, accepting both legacy and migrated key shapes.
"""
if not isinstance(msg_data, dict):
return None
user_name = msg_data.get('user_name') or msg_data.get('user')
if user_name:
return user_name
user_id = self._coerce_user_id(msg_data.get('user_id') or msg_data.get('userId'))
if user_id is None:
return None
try:
return self.users.get_username(user_id=user_id)
except Exception:
logger.warning(f"Unable to resolve user_name from user id '{user_id}'.")
return None
def resolve_user_id(self, msg_data: dict | None, user_name: str | None = None) -> int | None:
"""
Resolve a user id from payload fields, accepting both legacy and migrated key shapes.
"""
if isinstance(msg_data, dict):
user_id = self._coerce_user_id(msg_data.get('user_id') or msg_data.get('userId'))
if user_id is not None:
return user_id
if user_name:
try:
return self.get_user_info(user_name=user_name, info='User_id')
except Exception:
logger.warning(f"Unable to resolve user_id from user_name '{user_name}'.")
return None
return None
def create_new_user(self, email: str, username: str, password: str) -> bool:
"""
Creates a new user and logs the user in.
:param email: User's email address.
:param username: User's user_name.
:param password: User's password.
:return: bool - True on successful creation and log in.
"""
if not email or not username or not password:
raise ValueError("Missing required arguments for 'create_new_user'")
try:
self.users.create_new_user(email=email, username=username, password=password)
login_successful = self.users.log_in_user(username=username, password=password)
return login_successful
except Exception as e:
# Handle specific exceptions or log the error
raise ValueError("Error creating a new user: " + str(e))
def log_user_in_out(self, user_name: str, cmd: str, password: str = None):
"""
Logs the user in or out based on the provided command.
:param user_name: The user_name.
:param cmd: The command indicating the action to perform ('logout' or 'login').
:param password: The password for logging in. Required if cmd is 'login'.
:return: True if the action was successful, False otherwise.
"""
if cmd not in ['login', 'logout']:
raise ValueError("Invalid command. Expected 'login' or 'logout'.")
try:
if cmd == 'logout':
return self.users.log_out_user(username=user_name)
elif cmd == 'login':
if password is None:
raise ValueError("Password is required for login.")
return self.users.log_in_user(username=user_name, password=password)
except Exception as e:
# Handle specific exceptions or log the error
raise ValueError("Error during user login/logout: " + str(e))
def get_user_info(self, user_name: str, info: str) -> Any | None:
"""
Returns specified user info.
:param user_name: The user_name.
:param info: The information being requested.('Chart View','Is logged in?', 'User_id')
:return: The requested info or None.
:raises ValueError: If the provided info is invalid.
"""
if info == 'Chart View':
try:
return self.users.get_chart_view(user_name=user_name)
except Exception as e:
# Handle specific exceptions or log the error
raise ValueError("Error retrieving chart view information: " + str(e))
elif info == 'Is logged in?':
try:
return self.users.is_logged_in(user_name=user_name)
except Exception as e:
# Handle specific exceptions or log the error
raise ValueError("Error checking logged in status: " + str(e))
elif info == 'User_id':
try:
return self.users.get_id(user_name=user_name)
except Exception as e:
# Handle specific exceptions or log the error
raise ValueError("Error fetching id: " + str(e))
else:
raise ValueError("Invalid information requested: " + info)
def get_market_info(self, info: str, **kwargs) -> Any:
"""
Request market information from the application.
:param info: str - The information requested.
:param kwargs: arguments required depending on the info requested.
:return: The info requested.
"""
if info == 'Candle History':
chart_view = kwargs.get('chart_view', {})
num_records = kwargs.get('num_records', 10)
symbol = chart_view.get('market')
timeframe = chart_view.get('timeframe')
exchange_name = chart_view.get('exchange')
user_name = kwargs.get('user_name')
if symbol and timeframe and exchange_name and user_name:
return self.candles.get_candle_history(num_records=num_records,
symbol=symbol,
interval=timeframe,
exchange_name=exchange_name,
user_name=user_name)
else:
missing_args = [arg for arg in ['symbol', 'timeframe', 'exchange', 'user_name'] if arg not in kwargs]
raise ValueError(f"Missing required arguments for 'Candle History': {', '.join(missing_args)}")
elif info == 'Something Else':
# Add code or action for 'Something Else'
pass
else:
raise ValueError(f"Unknown or missing argument for get_market_info(): {info}")
return None
def get_indicator_data(self, user_name: str, source: dict, start_ts: float = None, num_results: int = None) -> dict:
"""
Fetches indicator data for a specific user.
:param user_name: The name of the user making the request.
:param source: A dictionary containing values specific to the type of indicator.
:param start_ts: The optional timestamp to start fetching the data from.
:param num_results: The optional number of results requested.
:return: dict - A dictionary of timestamp indexed indicator data.
:raises: ValueError if user_name or source is invalid.
"""
if not user_name:
raise ValueError("Invalid user_name provided.")
if not source:
raise ValueError("Invalid source provided.")
# Additional validation checks for start_ts and num_results if needed
return self.indicators.get_indicator_data(user_name=user_name, source=source, start_ts=start_ts,
num_results=num_results)
def connect_user_to_exchange(self, user_name: str, default_exchange: str, default_keys: dict = None) -> bool:
"""
Connects an exchange if it is not already connected.
:param user_name: str - The user executing the action.
:param default_exchange: - The name of the default exchange to connect.
:param default_keys: default API keys.
:return: bool - True on success.
"""
active_exchanges = self.users.get_exchanges(user_name, category='active_exchanges')
success = False
for exchange in active_exchanges:
keys = self.users.get_api_keys(user_name, exchange)
result = self.connect_or_config_exchange(user_name=user_name,
exchange_name=exchange,
api_keys=keys)
if (result['status'] == 'success') or (result['status'] == 'already_connected'):
success = True
if not success:
# If no active exchange was successfully connected, connect to the default exchange
result = self.connect_or_config_exchange(user_name=user_name,
exchange_name=default_exchange,
api_keys=default_keys)
if result['status'] == 'success':
success = True
return success
def get_js_init_data(self, user_name: str) -> dict:
"""
Returns a JSON object of initialization data.
This is passed into the frontend HTML template for the javascript to access in the rendered HTML.
:param user_name: str - The name of the user making the query.
"""
chart_view = self.users.get_chart_view(user_name=user_name)
indicator_types = self.indicators.get_available_indicator_types()
available_indicators = self.indicators.get_indicator_list(user_name)
exchange = self.exchanges.get_exchange(ename=chart_view.get('exchange'), uname=user_name)
if not chart_view:
chart_view = {'timeframe': '', 'exchange_name': '', 'market': ''}
if not indicator_types:
indicator_types = []
if not available_indicators:
available_indicators = []
js_data = {
'i_types': indicator_types,
'indicators': available_indicators,
'timeframe': chart_view.get('timeframe'),
'exchange_name': chart_view.get('exchange_name'),
'trading_pair': chart_view.get('market'),
'user_name': user_name,
'public_exchanges': self.exchanges.get_public_exchanges(),
'intervals': exchange.intervals if exchange else [],
'symbols': exchange.get_symbols() if exchange else {}
}
return js_data
def get_rendered_data(self, user_name: str) -> dict:
"""
Returns data required to render the HTML template of the application's frontend.
:param user_name: The name of the user executing the request.
:return: A dictionary containing the requested data.
"""
chart_view = self.users.get_chart_view(user_name=user_name)
exchange = self.exchanges.get_exchange(ename=chart_view.get('exchange'), uname=user_name)
# noinspection PyDictCreation
r_data = {}
r_data['title'] = self.config.get_setting('application_title')
r_data['chart_interval'] = chart_view.get('timeframe', '')
r_data['selected_exchange'] = chart_view.get('exchange', '')
r_data['intervals'] = exchange.intervals if exchange else []
r_data['symbols'] = exchange.get_symbols() if exchange else {}
r_data['available_exchanges'] = self.exchanges.get_available_exchanges() or []
r_data['connected_exchanges'] = self.exchanges.get_connected_exchanges(user_name) or []
r_data['configured_exchanges'] = self.users.get_exchanges(
user_name, category='configured_exchanges') or []
r_data['my_balances'] = self.exchanges.get_all_balances(user_name) or {}
r_data['indicator_types'] = self.indicators.get_available_indicator_types() or []
r_data['indicator_list'] = self.indicators.get_indicator_list(user_name) or []
r_data['enabled_indicators'] = self.indicators.get_indicator_list(user_name, only_enabled=True) or []
r_data['ma_vals'] = self.indicators.MV_AVERAGE_ENUM
r_data['active_trades'] = self.exchanges.get_all_activated(user_name, fetch_type='trades') or {}
r_data['open_orders'] = self.exchanges.get_all_activated(user_name, fetch_type='orders') or {}
return r_data
def received_cdata(self, cdata: dict) -> dict | None:
"""
Processes the received candle data and updates the indicators, signals, trades, and strategies.
:param cdata: Dictionary containing the most recent price data.
:return: Dictionary of updates to be passed onto the UI or None if received duplicate data.
"""
# Return if this candle is the same as the last candle received, except when it's the first candle received.
if not self.candles.cached_last_candle:
self.candles.cached_last_candle = cdata
elif cdata['time'] == self.candles.cached_last_candle['time']:
return None
self.candles.set_new_candle(cdata)
# i_updates = self.indicators.update_indicators()
state_changes = self.signals.process_all_signals(self.indicators)
# Build price updates dict: trades.update expects {symbol: price}
symbol = cdata.get('symbol', cdata.get('market', 'BTC/USDT'))
price_updates = {symbol: float(cdata['close'])}
trade_updates = self.trades.update(price_updates)
# Update all active strategy instances with new candle data
stg_updates = self.strategies.update(candle_data=cdata)
updates = {}
# if i_updates:
# updates['i_updates'] = i_updates
if state_changes:
updates['s_updates'] = state_changes
if trade_updates:
updates['trade_updts'] = trade_updates
if stg_updates:
updates['stg_updts'] = stg_updates
# Log any errors from strategy execution
for event in stg_updates:
if event.get('type') == 'error':
logger.warning(f"Strategy error: {event.get('message')} "
f"(user={event.get('user_id')}, strategy={event.get('strategy_id')})")
return updates
def received_new_signal(self, data: dict) -> str | dict:
"""
Handles the creation of a new signal based on the provided data.
:param data: A dictionary containing the attributes of the new signal.
:return: An error message if the required attribute is missing, or the incoming data for chaining on success.
"""
if 'name' not in data:
return "The new signal must have a 'name' attribute."
self.signals.new_signal(data)
self.config.set_setting('signals_list', self.signals.get_signals('dict'))
return data
def received_new_strategy(self, data: dict) -> dict:
"""
Handles the creation of a new strategy based on the provided data.
:param data: A dictionary containing the attributes of the new strategy.
:return: A dictionary indicating success or failure with an appropriate message.
"""
# Validate presence of required fields
required_fields = ['user_name', 'name', 'workspace', 'code']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {"success": False, "message": f"Missing fields: {', '.join(missing_fields)}"}
# Extract user_name and get user_id
user_name = data.get('user_name')
user_id = self.get_user_info(user_name=user_name, info='User_id')
if not user_id:
return {"success": False, "message": "User ID not found"}
# Validate data types and contents
if not isinstance(data['name'], str) or not data['name'].strip():
return {"success": False, "message": "Invalid or empty strategy name"}
if not isinstance(data['workspace'], str) or not data['workspace'].strip():
return {"success": False, "message": "Invalid or empty workspace data"}
if not isinstance(data['code'], (dict, str)) or not data['code']:
return {"success": False, "message": "Invalid or empty strategy code"}
try:
# Ensure 'code' is serialized as a JSON string
code_json = json.dumps(data['code']) if isinstance(data['code'], dict) else data['code']
except (TypeError, ValueError) as e:
return {"success": False, "message": f"Invalid strategy code: {str(e)}"}
# Prepare the strategy data for insertion
try:
strategy_data = {
"creator": user_id,
"name": data['name'].strip(),
"workspace": data['workspace'].strip(),
"code": code_json,
"stats": data.get('stats', {}),
"public": int(data.get('public', 0)),
"fee": float(data.get('fee', 0.0))
}
except Exception as e:
return {"success": False, "message": f"Error preparing strategy data: {str(e)}"}
# The default source for undefined sources in the strategy
try:
default_source = self.users.get_chart_view(user_name=user_name)
except Exception as e:
return {"success": False, "message": f"Error fetching chart view: {str(e)}"}
# Save the new strategy (in both cache and database) and handle the result
try:
result = self.strategies.new_strategy(strategy_data, default_source)
if result.get("success"):
return {
"success": True,
"strategy": result.get("strategy"), # Strategy object without `strategy_components`
"updated_at": result.get("updated_at"),
"message": result.get("message", "Strategy created successfully")
}
else:
return {"success": False, "message": result.get("message", "Failed to create strategy")}
except Exception as e:
# Log unexpected exceptions for debugging
logger.error(f"Error creating new strategy: {e}", exc_info=True)
return {"success": False, "message": "An unexpected error occurred while creating the strategy"}
def received_edit_strategy(self, data: dict) -> dict:
"""
Handles editing an existing strategy based on the provided data.
:param data: A dictionary containing the attributes of the strategy to edit.
:return: A dictionary containing success or failure information.
"""
# Extract user_name and strategy name from the data
user_name = data.get('user_name')
strategy_name = data.get('name')
if not user_name:
return {"success": False, "message": "User not specified"}
if not strategy_name:
return {"success": False, "message": "Strategy name not specified"}
# Fetch the user_id using the user_name
user_id = self.get_user_info(user_name=user_name, info='User_id')
if not user_id:
return {"success": False, "message": "User ID not found"}
# Retrieve the tbl_key using user_id and strategy name
filter_conditions = [('creator', user_id), ('name', strategy_name)]
strategy_row = self.data.get_rows_from_datacache(
cache_name='strategies',
filter_vals=filter_conditions,
include_tbl_key=True # Include tbl_key in the result
)
if strategy_row.empty:
return {"success": False, "message": "Strategy not found"}
# Ensure only one strategy is found
if len(strategy_row) > 1:
return {"success": False, "message": "Multiple strategies found. Please provide more specific information."}
# Extract the tbl_key
tbl_key = strategy_row.iloc[0]['tbl_key']
# Prepare the updated strategy data
strategy_data = {
"creator": user_id,
"name": strategy_name,
"workspace": data.get('workspace'),
"code": data.get('code'),
"stats": data.get('stats', {}),
"public": data.get('public', 0),
"fee": data.get('fee', None),
"tbl_key": tbl_key # Include the tbl_key to identify the strategy
}
# Get the default source for undefined sources in the strategy
try:
default_source = self.users.get_chart_view(user_name=user_name)
except Exception as e:
return {"success": False, "message": f"Error fetching chart view: {str(e)}"}
# Call the edit_strategy method to update the strategy
try:
result = self.strategies.edit_strategy(strategy_data, default_source)
if result.get("success"):
return {
"success": True,
"strategy": result.get("strategy"), # Strategy object without `strategy_components`
"updated_at": result.get("updated_at"),
"message": result.get("message", "Strategy updated successfully")
}
else:
return {"success": False, "message": result.get("message", "Failed to update strategy")}
except Exception as e:
# Log unexpected exceptions
logger.error(f"Error editing strategy: {e}", exc_info=True)
return {"success": False, "message": "An unexpected error occurred while editing the strategy"}
def delete_strategy(self, data: dict) -> dict:
"""
Deletes the specified strategy identified by tbl_key from the strategies instance.
:param data: Dictionary containing 'tbl_key' and 'user_name'.
:return: A dictionary indicating success or failure with an appropriate message and the tbl_key.
"""
# Validate tbl_key
tbl_key = data.get('tbl_key')
if not tbl_key:
return {"success": False, "message": "tbl_key not provided", "tbl_key": None}
# Call the delete_strategy method to remove the strategy
result = self.strategies.delete_strategy(tbl_key=tbl_key)
# Return the result with tbl_key included
if result.get('success'):
return {
"success": True,
"message": result.get('message'),
"tbl_key": tbl_key # Include tbl_key in the response
}
else:
return {
"success": False,
"message": result.get('message'),
"tbl_key": tbl_key # Include tbl_key even on failure for debugging
}
def start_strategy(
self,
user_id: int,
strategy_id: str,
mode: str,
initial_balance: float = 10000.0,
commission: float = 0.001,
) -> dict:
"""
Start a strategy in the specified mode (paper or live).
:param user_id: User identifier.
:param strategy_id: Strategy tbl_key.
:param mode: Trading mode ('paper' or 'live').
:param initial_balance: Starting balance for paper trading.
:param commission: Commission rate.
:return: Dictionary with success status and details.
"""
from brokers import TradingMode
import uuid
# Validate mode
if mode not in [TradingMode.PAPER, TradingMode.LIVE]:
return {"success": False, "message": f"Invalid mode '{mode}'. Use 'paper' or 'live'."}
# Live mode currently falls back to paper for execution.
effective_mode = TradingMode.PAPER if mode == TradingMode.LIVE else mode
# Get the strategy data
strategy_data = self.strategies.data_cache.get_rows_from_datacache(
cache_name='strategies',
filter_vals=[('tbl_key', strategy_id)],
include_tbl_key=True
)
if strategy_data.empty:
return {"success": False, "message": "Strategy not found."}
strategy_row = strategy_data.iloc[0]
strategy_name = strategy_row.get('name', 'Unknown')
# Authorization check: user must own the strategy or strategy must be public
strategy_creator = strategy_row.get('creator')
is_public = bool(strategy_row.get('public', False))
if not is_public:
requester_name = None
try:
requester_name = self.users.get_username(user_id=user_id)
except Exception:
logger.warning(f"Unable to resolve username for user id '{user_id}'.")
creator_str = str(strategy_creator) if strategy_creator is not None else ''
requester_id_str = str(user_id)
creator_matches_user = False
if creator_str:
# Support creator being stored as user_name or user_id.
creator_matches_user = (
(requester_name is not None and creator_str == requester_name) or
(creator_str == requester_id_str)
)
if not creator_matches_user and creator_str:
# Also check if creator is a username that resolves to the current user id.
try:
creator_id = self.get_user_info(user_name=creator_str, info='User_id')
creator_matches_user = creator_id == user_id
except Exception:
creator_matches_user = False
if not creator_matches_user:
return {
"success": False,
"message": "You do not have permission to run this strategy."
}
# Check if already running
instance_key = (user_id, strategy_id, effective_mode)
if instance_key in self.strategies.active_instances:
return {
"success": False,
"message": f"Strategy '{strategy_name}' is already running in {effective_mode} mode."
}
# Get the generated code from strategy_components
try:
import json
components = json.loads(strategy_row.get('strategy_components', '{}'))
# Key is 'generated_code' not 'code' - matches PythonGenerator output
generated_code = components.get('generated_code', '')
if not generated_code:
return {"success": False, "message": "Strategy has no generated code."}
except (json.JSONDecodeError, TypeError) as e:
return {"success": False, "message": f"Invalid strategy components: {e}"}
# Create unique instance ID
strategy_instance_id = str(uuid.uuid4())
# Create the strategy instance
try:
instance = self.strategies.create_strategy_instance(
mode=mode,
strategy_instance_id=strategy_instance_id,
strategy_id=strategy_id,
strategy_name=strategy_name,
user_id=user_id,
generated_code=generated_code,
initial_balance=initial_balance,
commission=commission,
price_provider=lambda symbol: self.exchanges.get_price(symbol),
)
# Store the active instance
self.strategies.active_instances[instance_key] = instance
logger.info(f"Started strategy '{strategy_name}' for user {user_id} in {mode} mode")
return {
"success": True,
"message": f"Strategy '{strategy_name}' started in {mode} mode.",
"strategy_id": strategy_id,
"strategy_name": strategy_name,
"instance_id": strategy_instance_id,
"mode": mode,
"actual_mode": effective_mode,
"initial_balance": initial_balance,
}
except Exception as e:
logger.error(f"Failed to create strategy instance: {e}", exc_info=True)
return {"success": False, "message": f"Failed to start strategy: {str(e)}"}
def stop_strategy(
self,
user_id: int,
strategy_id: str,
mode: str,
) -> dict:
"""
Stop a running strategy.
:param user_id: User identifier.
:param strategy_id: Strategy tbl_key.
:param mode: Trading mode.
:return: Dictionary with success status.
"""
from brokers import TradingMode
instance_key = (user_id, strategy_id, mode)
instance = self.strategies.active_instances.get(instance_key)
# Compatibility for live mode fallback.
if instance is None and mode == TradingMode.LIVE:
fallback_key = (user_id, strategy_id, TradingMode.PAPER)
instance = self.strategies.active_instances.get(fallback_key)
if instance is not None:
instance_key = fallback_key
if instance is None:
return {
"success": False,
"message": f"No running strategy found for this user/strategy/mode combination."
}
self.strategies.active_instances.pop(instance_key, None)
actual_mode = instance_key[2]
strategy_name = instance.strategy_name
# Get final stats if available
final_stats = {}
if hasattr(instance, 'broker') and hasattr(instance.broker, 'get_balance'):
final_stats['final_balance'] = instance.broker.get_balance()
final_stats['available_balance'] = instance.broker.get_available_balance()
if hasattr(instance, 'trade_history'):
final_stats['total_trades'] = len(instance.trade_history)
logger.info(f"Stopped strategy '{strategy_name}' for user {user_id} in {mode} mode")
return {
"success": True,
"message": f"Strategy '{strategy_name}' stopped.",
"strategy_id": strategy_id,
"strategy_name": strategy_name,
"mode": mode,
"actual_mode": actual_mode,
"final_stats": final_stats,
}
def get_strategy_status(
self,
user_id: int,
strategy_id: str = None,
mode: str = None,
) -> dict:
"""
Get the status of running strategies for a user.
:param user_id: User identifier.
:param strategy_id: Optional strategy ID to filter.
:param mode: Optional mode to filter.
:return: Dictionary with strategy statuses.
"""
running_strategies = []
for (uid, sid, m), instance in self.strategies.active_instances.items():
if uid != user_id:
continue
if strategy_id and sid != strategy_id:
continue
if mode and m != mode:
continue
status = {
"strategy_id": sid,
"strategy_name": instance.strategy_name,
"mode": m,
"instance_id": instance.strategy_instance_id,
}
# Add broker stats if available
if hasattr(instance, 'broker'):
status['balance'] = instance.broker.get_balance()
status['available_balance'] = instance.broker.get_available_balance()
# Get positions
if hasattr(instance.broker, 'get_all_positions'):
positions = instance.broker.get_all_positions()
status['positions'] = [
{
'symbol': p.symbol,
'size': p.size,
'entry_price': p.entry_price,
'unrealized_pnl': p.unrealized_pnl,
}
for p in positions
]
if hasattr(instance, 'trade_history'):
status['trade_count'] = len(instance.trade_history)
running_strategies.append(status)
return {
"success": True,
"running_strategies": running_strategies,
"count": len(running_strategies),
}
def delete_signal(self, signal_name: str) -> None:
"""
Deletes a signal from the signals instance and removes it from the configuration file.
:param signal_name: The name of the signal to delete.
:return: None
"""
# Delete the signal from the signals instance.
self.signals.delete_signal(signal_name)
# # Delete the signal from the configuration file.TODO
# self.config.remove('signals', signal_name)
def get_signals_json(self) -> str:
"""
Retrieve all the signals from the signals instance and return them as a JSON object.
:return: str - A JSON object containing all the signals.
"""
return self.signals.get_signals('json')
def get_strategies_json(self, user_id) -> list:
"""
Retrieve all public and user strategies from the strategies instance and return them as a list of dictionaries.
:return: list - A list of dictionaries, each representing a strategy.
"""
return self.strategies.get_all_strategies(user_id, 'dict')
def connect_or_config_exchange(self, user_name: str, exchange_name: str, api_keys: dict = None) -> dict:
"""
Connects to an exchange if not already connected, or configures the exchange connection for a single user.
:param user_name: str - The name of the user.
:param exchange_name: str - The name of the exchange.
:param api_keys: dict - The API keys for the exchange.
:return: dict - A dictionary containing the result of the operation.
"""
result = {
'exchange': exchange_name,
'status': '',
'message': ''
}
try:
if self.data.get_serialized_datacache(cache_name='exchange_data',
filter_vals=([('user', user_name), ('name', exchange_name)])).empty:
# Exchange is not connected, try to connect
success = self.exchanges.connect_exchange(exchange_name=exchange_name, user_name=user_name,
api_keys=api_keys)
if success:
self.users.active_exchange(exchange=exchange_name, user_name=user_name, cmd='set')
if api_keys:
self.users.update_api_keys(api_keys=api_keys, exchange=exchange_name, user_name=user_name)
result['status'] = 'success'
result['message'] = f'Successfully connected to {exchange_name}.'
else:
result['status'] = 'failure'
result['message'] = f'Failed to connect to {exchange_name}.'
else:
# Exchange is already connected, check if API keys need updating
if api_keys:
# Get current API keys
current_keys = self.users.get_api_keys(user_name, exchange_name)
# Compare current keys with provided keys
if current_keys != api_keys:
self.users.update_api_keys(api_keys=api_keys, exchange=exchange_name, user_name=user_name)
result['message'] = f'{exchange_name}: API keys updated.'
else:
result['message'] = f'{exchange_name}: API keys unchanged.'
result['status'] = 'already_connected'
except Exception as e:
result['status'] = 'error'
result['message'] = f"Failed to connect to {exchange_name} for user '{user_name}': {str(e)}"
return result
def close_trade(self, trade_id):
"""
Closes a trade identified by the given trade ID.
:param trade_id: The ID of the trade to be closed.
"""
if self.trades.is_valid_trade_id(trade_id):
pass
# self.trades.close_trade(trade_id)TODO
# self.config.remove('trades', trade_id)
print(f"Trade {trade_id} has been closed.")
else:
print(f"Invalid trade ID: {trade_id}. Unable to close the trade.")
def received_new_trade(self, data: dict) -> dict | None:
"""
Called when a new trade has been defined and created in the UI.
:param data: A dictionary containing the attributes of the trade.
:return: The details of the trade as a dictionary, or None on failure.
"""
def vld(attr):
"""
Casts numeric strings to float before returning the attribute.
Returns None if the attribute is absent in the data.
"""
if attr in data and data[attr] != '':
try:
return float(data[attr])
except ValueError:
return data[attr]
else:
return None
# Forward the request to trades.
status, result = self.trades.new_trade(target=vld('exchange_name'), symbol=vld('symbol'), price=vld('price'),
side=vld('side'), order_type=vld('orderType'),
qty=vld('quantity'))
if status == 'Error':
print(f'Error placing the trade: {result}')
return None
print(f'Trade order received: exchange_name={vld("exchange_name")}, '
f'symbol={vld("symbol")}, '
f'side={vld("side")}, '
f'type={vld("orderType")}, '
f'quantity={vld("quantity")}, '
f'price={vld("price")}')
# Update config's list of trades and save to file.TODO
# self.config.update_data('trades', self.trades.get_trades('dict'))
trade_obj = self.trades.get_trade_by_id(result)
if trade_obj:
# Return the trade object that was created in a form that can be converted to json.
return trade_obj.__dict__
else:
return None
def get_trades(self):
""" Return a JSON object of all the trades in the trades instance."""
return self.trades.get_trades('dict')
def delete_backtest(self, msg_data):
""" Delete an existing backtest by interacting with the Backtester. """
backtest_name = msg_data.get('name')
user_name = self.resolve_user_name(msg_data)
if not backtest_name or not user_name:
return {"success": False, "message": "Missing backtest name or user name."}
# Construct the backtest_key based on Backtesters naming convention
backtest_key = f"backtest:{user_name}:{backtest_name}"
try:
# Delegate the deletion to the Backtester
self.backtester.remove_backtest(backtest_key)
return {"success": True, "message": f"Backtest '{backtest_name}' deleted successfully.",
"name": backtest_name}
except KeyError:
return {"success": False, "message": f"Backtest '{backtest_name}' not found."}
except Exception as e:
return {"success": False, "message": f"Error deleting backtest: {str(e)}"}
def adjust_setting(self, user_name: str, setting: str, params: Any):
"""
Adjusts the specified setting for a user.
:param user_name: The name of the user.
:param setting: The setting to adjust.
:param params: The parameters for the setting adjustment.
:raises ValueError: If the provided setting is not supported.
"""
print(f"[SETTINGS()] MODIFYING({user_name, setting})")
if setting == 'interval':
interval_state = params['timeframe']
self.users.set_chart_view(values=interval_state, specific_property='timeframe', user_name=user_name)
elif setting == 'trading_pair':
trading_pair = params['symbol']
self.users.set_chart_view(values=trading_pair, specific_property='market', user_name=user_name)
elif setting == 'exchange':
exchange_name = params['exchange_name']
# Get the list of available symbols (markets) for the specified exchange and user.
markets = self.exchanges.get_exchange(ename=exchange_name, uname=user_name).get_symbols()
# Check if the markets list is empty
if not markets:
# If no markets are available, exit without changing the chart view.
print(f"No available markets found for exchange '{exchange_name}'. Chart view remains unchanged.")
return
# Get the currently viewed market for the user.
current_symbol = self.users.get_chart_view(user_name=user_name, prop='market')
# Determine the market to display based on availability.
if current_symbol not in markets:
# If the current market is not available, default to the first available market.
market = markets[0]
else:
# Otherwise, continue displaying the current market.
market = current_symbol
# Update the user's chart view to reflect the new exchange and default market.
self.users.set_chart_view(values=exchange_name, specific_property='exchange_name',
user_name=user_name, default_market=market)
elif setting == 'toggle_indicator':
# Parse the indicator field as a JSON array
try:
indicators_to_toggle = json.loads(params.get('indicator', '[]'))
except json.JSONDecodeError:
indicators_to_toggle = []
user_id = self.get_user_info(user_name=user_name, info='User_id')
self.indicators.toggle_indicators(user_id=user_id, indicator_names=indicators_to_toggle)
elif setting == 'edit_indicator':
self.indicators.edit_indicator(user_name=user_name, params=params)
elif setting == 'delete_indicator':
self.indicators.delete_indicator(user_name=user_name, indicator_name=params)
elif setting == 'new_indicator':
self.indicators.new_indicator(user_name=user_name, params=params)
else:
print(f'ERROR SETTING VALUE')
print(f'The string received by the server was: /n{params}')
return
def process_incoming_message(self, msg_type: str, msg_data: dict, socket_conn_id: str) -> dict | None:
"""
Processes an incoming message and performs the corresponding actions based on the message type and data.
:param socket_conn_id: The WebSocket connection to send updates back to the client.
:param msg_type: The type of the incoming message.
:param msg_data: The data associated with the incoming message.
:return: dict|None - A dictionary containing the response message and data, or None if no response is needed or
no data is found to ensure the WebSocket channel isn't burdened with unnecessary
communication.
"""
def standard_reply(reply_msg: str, reply_data: Any) -> dict:
""" Formats a standard reply message. """
return {"reply": reply_msg, "data": reply_data}
user_name = self.resolve_user_name(msg_data)
user_id = self.resolve_user_id(msg_data, user_name=user_name)
if user_name:
msg_data.setdefault('user_name', user_name)
msg_data.setdefault('user', user_name)
if user_id is not None:
msg_data.setdefault('user_id', user_id)
msg_data.setdefault('userId', user_id)
if msg_type == 'candle_data':
if r_data := self.received_cdata(msg_data):
return standard_reply("updates", r_data)
if msg_type == 'request':
request_for = msg_data.get('request')
if request_for == 'signals':
if signals := self.get_signals_json():
return standard_reply("signals", signals)
elif request_for == 'strategies':
if user_id is None:
return standard_reply("strategy_error", {"message": "User not specified"})
if strategies := self.get_strategies_json(user_id):
return standard_reply("strategies", strategies)
elif request_for == 'trades':
if trades := self.get_trades():
return standard_reply("trades", trades)
else:
print('Warning: Unhandled request!')
print(msg_data)
# Processing commands
if msg_type == 'delete_signal':
pass
# self.delete_signal(msg_data)
if msg_type == 'delete_strategy':
result = self.delete_strategy(msg_data)
if result.get('success'):
return standard_reply("strategy_deleted", {
"message": result.get('message'),
"tbl_key": result.get('tbl_key') # Include tbl_key in the response
})
else:
return standard_reply("strategy_error", {
"message": result.get('message'),
"tbl_key": result.get('tbl_key') # Include tbl_key for debugging purposes
})
if msg_type == 'close_trade':
self.close_trade(msg_data)
if msg_type == 'new_signal':
if r_data := self.received_new_signal(msg_data):
return standard_reply("signal_created", r_data)
if msg_type == 'new_strategy':
try:
if r_data := self.received_new_strategy(msg_data):
return standard_reply("strategy_created", r_data)
except Exception as e:
logger.error(f"Error processing new_strategy: {e}", exc_info=True)
return standard_reply("strategy_error", {"message": "Failed to create strategy."})
if msg_type == 'edit_strategy':
try:
if r_data := self.received_edit_strategy(msg_data):
return standard_reply("strategy_updated", r_data)
except Exception as e:
# Log the error for debugging
logger.error(f"Error processing edit_strategy: {e}", exc_info=True)
return standard_reply("strategy_error", {"message": "Failed to edit strategy."})
if msg_type == 'new_trade':
if r_data := self.received_new_trade(msg_data):
return standard_reply("trade_created", r_data)
if msg_type == 'config_exchange':
user = msg_data.get('user') or user_name
exchange = msg_data.get('exch') or msg_data.get('exchange') or msg_data.get('exchange_name')
keys = msg_data.get('keys')
if not user or not exchange:
return standard_reply("Exchange_connection_result", {
"exchange": exchange or '',
"status": "error",
"message": "Missing user or exchange in request."
})
r_data = self.connect_or_config_exchange(user_name=user, exchange_name=exchange, api_keys=keys)
return standard_reply("Exchange_connection_result", r_data)
# Handle backtest operations
if msg_type == 'submit_backtest':
# Validate required fields
required_fields = ['strategy', 'start_date', 'capital', 'commission']
if not all(field in msg_data for field in required_fields):
return standard_reply("backtest_error", {"message": "Missing required fields."})
if not user_name:
return standard_reply("backtest_error", {"message": "Missing user identity."})
try:
# Delegate backtest handling to the Backtester
resp = self.backtester.handle_backtest_message(
user_id=user_id if user_id is not None else self.get_user_info(user_name=user_name, info='User_id'),
msg_data=msg_data,
socket_conn_id=socket_conn_id
)
if 'error' in resp:
# If there's an error, send a backtest_error message
return standard_reply("backtest_error", {"message": resp['error']})
else:
# If successful, send a backtest_submitted message
return standard_reply("backtest_submitted", resp)
except Exception as e:
# Catch any unexpected exceptions and send a backtest_error message
logger.error(f"Unhandled exception during backtest submission: {e}", exc_info=True)
return standard_reply("backtest_error",
{"message": "An unexpected error occurred during backtest submission."})
if msg_type == 'delete_backtest':
response = self.delete_backtest(msg_data)
return standard_reply("backtest_deleted", response)
if msg_type == 'run_strategy':
# Run a strategy in paper or live mode
required_fields = ['strategy_id', 'mode']
if not all(field in msg_data for field in required_fields):
return standard_reply("strategy_run_error", {"message": "Missing required fields (strategy_id, mode)."})
strategy_id = msg_data.get('strategy_id')
mode = msg_data.get('mode', 'paper').lower()
try:
# Parse numeric values safely inside try block
initial_balance = float(msg_data.get('initial_balance', 10000.0))
commission = float(msg_data.get('commission', 0.001))
# Validate numeric ranges
if initial_balance <= 0:
return standard_reply("strategy_run_error", {"message": "Initial balance must be positive."})
if commission < 0 or commission > 1:
return standard_reply("strategy_run_error", {"message": "Commission must be between 0 and 1."})
result = self.start_strategy(
user_id=user_id,
strategy_id=strategy_id,
mode=mode,
initial_balance=initial_balance,
commission=commission,
)
if result.get('success'):
# Add explicit warning if live mode was requested but fell back to paper
if mode == 'live' and result.get('actual_mode') == 'paper':
result['warning'] = "Live trading is not yet implemented. Running in paper trading mode for safety."
return standard_reply("strategy_started", result)
else:
return standard_reply("strategy_run_error", result)
except ValueError as e:
return standard_reply("strategy_run_error", {"message": f"Invalid numeric value: {str(e)}"})
except Exception as e:
logger.error(f"Error starting strategy: {e}", exc_info=True)
return standard_reply("strategy_run_error", {"message": f"Failed to start strategy: {str(e)}"})
if msg_type == 'stop_strategy':
strategy_id = msg_data.get('strategy_id')
mode = msg_data.get('mode', 'paper').lower()
if not strategy_id:
return standard_reply("strategy_stop_error", {"message": "Missing strategy_id."})
try:
result = self.stop_strategy(
user_id=user_id,
strategy_id=strategy_id,
mode=mode,
)
if result.get('success'):
return standard_reply("strategy_stopped", result)
else:
return standard_reply("strategy_stop_error", result)
except Exception as e:
logger.error(f"Error stopping strategy: {e}", exc_info=True)
return standard_reply("strategy_stop_error", {"message": f"Failed to stop strategy: {str(e)}"})
if msg_type == 'get_strategy_status':
strategy_id = msg_data.get('strategy_id')
mode = msg_data.get('mode')
try:
result = self.get_strategy_status(
user_id=user_id,
strategy_id=strategy_id,
mode=mode,
)
return standard_reply("strategy_status", result)
except Exception as e:
logger.error(f"Error getting strategy status: {e}", exc_info=True)
return standard_reply("strategy_status_error", {"message": f"Failed to get status: {str(e)}"})
if msg_type == 'reply':
# If the message is a reply log the response to the terminal.
print(f"\napp.py:Received reply: {msg_data}")