brighter-trading/src/app.py

572 lines
23 KiB
Python

# app.py
# Monkey patching must occur before other imports
import eventlet
eventlet.monkey_patch() # noqa: E402
# Standard library imports
import logging # noqa: E402
import os # noqa: E402
# import json # noqa: E402
# import datetime as dt # noqa: E402
# Third-party imports
from flask import Flask, render_template, request, redirect, jsonify, session, flash # noqa: E402
from flask_cors import CORS # noqa: E402
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect # noqa: E402
from email_validator import validate_email, EmailNotValidError # noqa: E402
# Local application imports
from BrighterTrades import BrighterTrades # noqa: E402
from utils import sanitize_for_json # noqa: E402
# Set up logging
log_level_name = os.getenv('BRIGHTER_LOG_LEVEL', 'INFO').upper()
log_level = getattr(logging, log_level_name, logging.INFO)
# Debug file logger for execution loop
_loop_debug = logging.getLogger('loop_debug')
_loop_debug.setLevel(logging.DEBUG)
_loop_handler = logging.FileHandler('/home/rob/PycharmProjects/BrighterTrading/trade_debug.log', mode='a')
_loop_handler.setFormatter(logging.Formatter('%(asctime)s - [LOOP] %(message)s'))
_loop_debug.addHandler(_loop_handler)
logging.basicConfig(level=log_level)
logging.getLogger('ccxt.base.exchange').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# Create a Flask object named app that serves the html.
app = Flask(__name__)
# Create a socket in order to receive requests.
socketio = SocketIO(app, async_mode='eventlet')
# Create a BrighterTrades object. This the main application that maintains access to the server, local storage,
# and manages objects that process trade data.
brighter_trades = BrighterTrades(socketio)
# Set server configuration globals.
CORS_HEADERS = 'Content-Type'
# Set the app directly with the globals.
app.config.from_object(__name__)
app.secret_key = '1_BAD_secrete_KEY_is_not_2'
# Enable cross-origin resource sharing for specific endpoints
cors = CORS(app, supports_credentials=True,
resources={
r"/api/history": {"origins": ["http://127.0.0.1:5000", "http://localhost:5000"]},
r"/api/indicator_init": {"origins": ["http://127.0.0.1:5000", "http://localhost:5000"]}
},
allow_headers=['Content-Type']) # Change from headers to allow_headers
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Credentials'] = 'true'
return response
# =============================================================================
# Strategy Execution Loop (Background Task)
# =============================================================================
# This runs in the background and periodically executes active live/paper strategies
STRATEGY_LOOP_INTERVAL = 10 # seconds between strategy ticks
_strategy_loop_running = False
def strategy_execution_loop():
"""
Background loop that executes active strategies periodically.
For live trading, strategies need to be triggered server-side,
not just when the client sends candle data.
"""
global _strategy_loop_running
_strategy_loop_running = True
logger = logging.getLogger('strategy_loop')
logger.info("Strategy execution loop started")
while _strategy_loop_running:
try:
# Check if there are any active strategy instances
active_count = brighter_trades.strategies.get_active_count()
# Log every iteration for debugging
if not hasattr(strategy_execution_loop, '_iter_count'):
strategy_execution_loop._iter_count = 0
strategy_execution_loop._iter_count += 1
logger.info(f"Execution loop iteration {strategy_execution_loop._iter_count}, active_count={active_count}")
if active_count > 0:
logger.info(f"Executing {active_count} active strategies...")
# Iterate directly over active instances
instance_keys = list(brighter_trades.strategies.active_instances.keys())
for instance_key in instance_keys:
try:
user_id, strategy_id, mode = instance_key
instance = brighter_trades.strategies.active_instances.get(instance_key)
if instance is None:
continue
# For live strategies, get current price as candle data
# Default to BTC/USDT if no symbol specified
symbol = getattr(instance, 'symbol', 'BTC/USDT')
try:
price = brighter_trades.exchanges.get_price(symbol)
if price:
import time
candle_data = {
'symbol': symbol,
'close': price,
'open': price,
'high': price,
'low': price,
'volume': 0,
'time': int(time.time())
}
# Execute strategy tick
events = instance.tick(candle_data)
if events:
logger.info(f"Strategy {strategy_id} generated {len(events)} events: {events}")
# Emit events to the user's room
user_name = brighter_trades.users.get_username(user_id=user_id)
logger.info(f"Emitting to user_id={user_id}, user_name={user_name}")
if user_name:
socketio.emit('strategy_events', sanitize_for_json({
'strategy_id': strategy_id,
'mode': mode,
'events': events
}), room=user_name)
logger.info(f"Emitted strategy_events to room={user_name}")
else:
logger.warning(f"Could not find username for user_id={user_id}")
except Exception as e:
logger.warning(f"Could not get price for {symbol}: {e}")
except Exception as e:
logger.error(f"Error executing strategy {instance_key}: {e}", exc_info=True)
# Update active trades (runs every iteration, regardless of active strategies)
_loop_debug.debug(f"Checking active_trades: {len(brighter_trades.trades.active_trades)} trades")
if brighter_trades.trades.active_trades:
_loop_debug.debug(f"Has active trades, getting prices...")
try:
symbols = set(trade.symbol for trade in brighter_trades.trades.active_trades.values())
_loop_debug.debug(f"Symbols to fetch: {symbols}")
price_updates = {}
for symbol in symbols:
try:
price = brighter_trades.exchanges.get_price(symbol)
_loop_debug.debug(f"Got price for {symbol}: {price}")
if price:
price_updates[symbol] = price
except Exception as e:
_loop_debug.debug(f"Failed to get price for {symbol}: {e}")
logger.warning(f"Could not get price for {symbol}: {e}")
_loop_debug.debug(f"price_updates: {price_updates}")
if price_updates:
_loop_debug.debug(f"Calling brighter_trades.trades.update()")
trade_updates = brighter_trades.trades.update(price_updates)
_loop_debug.debug(f"trade_updates returned: {trade_updates}")
if trade_updates:
logger.debug(f"Trade updates (no active strategies): {trade_updates}")
for update in trade_updates:
trade_id = update.get('id')
trade = brighter_trades.trades.active_trades.get(trade_id)
_loop_debug.debug(f"Emitting update for trade_id={trade_id}, creator={trade.creator if trade else None}")
if trade and trade.creator:
user_name = brighter_trades.users.get_username(user_id=trade.creator)
if user_name:
socketio.emit('trade_update', sanitize_for_json(update), room=user_name)
_loop_debug.debug(f"Emitted trade_update to room={user_name}")
except Exception as e:
_loop_debug.debug(f"Exception in trade update: {e}")
logger.error(f"Error updating trades (no strategies): {e}", exc_info=True)
except Exception as e:
logger.error(f"Strategy execution loop error: {e}")
# Sleep before next iteration
eventlet.sleep(STRATEGY_LOOP_INTERVAL)
logger.info("Strategy execution loop stopped")
def start_strategy_loop():
"""Start the strategy execution loop in a background greenlet."""
eventlet.spawn(strategy_execution_loop)
# Start the loop when the app starts (will be called from main block)
def _coerce_user_id(user_id):
if user_id is None or user_id == '':
return None
try:
return int(user_id)
except (TypeError, ValueError):
return None
def resolve_user_name(payload: dict | None) -> str | None:
"""
Resolve a username from payload fields, accepting both legacy and migrated key shapes.
"""
if not isinstance(payload, dict):
return None
user_name = payload.get('user_name') or payload.get('user')
if user_name:
return user_name
user_id = _coerce_user_id(payload.get('user_id') or payload.get('userId'))
if user_id is None:
return None
try:
return brighter_trades.users.get_username(user_id=user_id)
except Exception:
logging.warning(f"Unable to resolve user_name from user id '{user_id}'.")
return None
@app.route('/')
def welcome():
"""
Serves the welcome/landing page.
"""
return render_template('welcome.html')
@app.route('/app')
# @cross_origin(supports_credentials=True)
def index():
"""
Fetches data from brighter_trades and inject it into an HTML template.
Renders the html template and serves the web application.
"""
# session.clear() # only for debugging!!!!
try:
# Log the user in.
user_name = brighter_trades.users.load_or_create_user(username=session.get('user'))
except ValueError as e:
if str(e) != 'GuestLimitExceeded!':
raise
else:
flash('The system is too busy for another user.')
return redirect('/signup')
# Ensure client session is assigned a user_name.
session['user'] = user_name
print('[SERVING INDEX] (USERNAME):', user_name)
default_exchange = 'binance'
default_keys = None # No keys needed for public market data
# Ensure that a valid connection with an exchange exists
result = brighter_trades.connect_user_to_exchange(user_name=user_name,
default_exchange=default_exchange,
default_keys=default_keys)
if not result:
raise ValueError("Couldn't connect to the default exchange.")
# A dict of data required to build the html of the user app.
# Dynamic content like options and titles and balances to display.
rendered_data = brighter_trades.get_rendered_data(user_name)
# Jsom initialization data to be passed into the web application.
js_data = brighter_trades.get_js_init_data(user_name)
# serve the landing page.
return render_template('index.html',
title=rendered_data['title'],
user_name=user_name,
my_balances=rendered_data['my_balances'],
symbols=rendered_data['symbols'],
intervals=rendered_data['intervals'],
interval_state=rendered_data['chart_interval'],
exchanges=rendered_data['available_exchanges'],
cond_exchanges=rendered_data['connected_exchanges'],
confd_exchanges=rendered_data['configured_exchanges'],
selected_exchange=rendered_data['selected_exchange'],
indicator_types=rendered_data['indicator_types'],
indicator_list=rendered_data['indicator_list'],
checked=rendered_data['enabled_indicators'],
ma_vals=rendered_data['ma_vals'],
js_data=js_data,
active_trades=rendered_data['active_trades'],
open_orders=rendered_data['open_orders'])
@socketio.on('connect')
def handle_connect():
user_name = request.args.get('user_name')
if not user_name:
user_name = resolve_user_name({
'userId': request.args.get('userId'),
'user_id': request.args.get('user_id')
})
if user_name and brighter_trades.get_user_info(user_name=user_name, info='Is logged in?'):
# Join a room specific to the user for targeted messaging
room = user_name # You can choose an appropriate room naming strategy
join_room(room)
emit('message', {'reply': 'connected', 'data': 'Connection established'})
else:
emit('message', {'reply': 'error', 'data': 'User not authenticated'})
# Disconnect the client if not authenticated
disconnect()
@socketio.on('message')
def handle_message(data):
"""
Handle incoming JSON messages with authentication.
"""
# Validate input
if 'message_type' not in data or 'data' not in data:
emit('message', {"success": False, "message": "Invalid message format."})
return
msg_type, msg_data = data['message_type'], data['data']
# Extract user_name from the incoming message data
user_name = resolve_user_name(msg_data)
if not user_name:
emit('message', {"success": False, "message": "User not specified"})
return
msg_data.setdefault('user_name', user_name)
try:
user_id = brighter_trades.get_user_info(user_name=user_name, info='User_id')
if user_id is not None:
msg_data.setdefault('user_id', user_id)
msg_data.setdefault('userId', user_id)
except Exception:
pass
# Check if the user is logged in
if not brighter_trades.get_user_info(user_name=user_name, info='Is logged in?'):
emit('message', {"success": False, "message": "User not logged in"})
return
# Process the incoming message based on the type
resp = brighter_trades.process_incoming_message(msg_type=msg_type, msg_data=msg_data, socket_conn_id=request.sid)
# Send the response back to the client
if resp:
emit('message', resp)
@app.route('/settings', methods=['POST'])
def settings():
"""
This route is used to change settings that require a browser refresh.
Options are:
Data source options for the chart display:
timeframe: For setting the time frame the of the viewport.
trading_pair: For setting the market displayed in the viewport.
exchange_name: For setting the exchange_name displayed in the viewport.
Controls of the indicators:
new_indicator: Creates a new indicator and stores it.
edit_indicator: Edits the properties of specific indicators.
toggle_indicator: Enables or disables display of indicators in the viewport.
:return: None - Redirects to index, or returns JSON for async requests.
"""
# Request the user_name from the client session.
if not (user_name := session.get('user')):
return jsonify({"success": False, "message": "Not logged in"}), 401
# Return if the user is not logged in.
if not brighter_trades.get_user_info(user_name=user_name, info='Is logged in?'):
return jsonify({"success": False, "message": "User not logged in"}), 401
# Get the setting that the application wants to change.
try:
# Handle JSON or form submissions
if request.is_json:
data = request.json
setting = data.get('setting')
params = data.get('indicator', {})
else:
setting = request.form.get('setting')
# Use to_dict(flat=False) to get lists for multi-value fields (like checkboxes)
params = request.form.to_dict(flat=False)
# Convert single-value lists back to single values, except for 'indicator'
for key, value in params.items():
if key != 'indicator' and isinstance(value, list) and len(value) == 1:
params[key] = value[0]
if not setting:
return jsonify({"success": False, "message": "No setting provided"}), 400
# Change the setting.
brighter_trades.adjust_setting(user_name=user_name, setting=setting, params=params)
# Return success as JSON if called via an async request
if request.is_json:
return jsonify({"success": True}), 200
# Redirect if this is a form submission (non-async request)
return redirect('/app')
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/history', methods=['POST', 'GET'])
def history():
"""
Fetches the candle history of a specific trading pair and timeframe.
Currently, set to last 1000 candles.
:return: json - price history
"""
try:
data = request.get_json()
if not data:
raise ValueError("No JSON data received")
username = resolve_user_name(data)
# Return if the user is not logged in.
if not username:
# redirect('/')
return jsonify({'error': 'User not logged in.'}), 401
print('[RECEIVED PRICE HISTORY REQUEST] - user', username)
# Get a dictionary of chart attributes from the user data indexed by user_name.
chart_view = brighter_trades.get_user_info(user_name=username, info='Chart View')
payload = brighter_trades.get_market_info(
info='Candle History', num_records=1000, chart_view=chart_view, user_name=username)
print('[SENDING PRICE HISTORY TO CLIENT]', payload.tail(2))
return jsonify(payload.to_dict('records')), 200
except Exception as e:
print(f'Error in history endpoint: {e}')
return jsonify({'error': str(e)}), 500
@app.route('/docs')
def docs():
"""
Documentation page - placeholder for now.
"""
# TODO: Link to actual documentation when available
flash('Documentation coming soon! Redirecting to the app.')
return redirect('/app')
@app.route('/signup')
def signup():
return render_template('sign_up.html', title='title')
@app.route('/signout')
def signout():
if not (user_name := session.get('user')):
return redirect('/')
if brighter_trades.log_user_in_out(user_name=user_name, cmd='logout'):
# If the user was logged out successfully delete the session var.
del session['user']
return redirect('/')
@app.route('/login', methods=['POST'])
def login():
# Get the user_name and password from the form data
username = request.form.get('username')
password = request.form.get('password')
# Validate the input
if not username or not password:
flash('Please provide both user_name and password.')
return redirect('/')
# Attempt to log in the user
success = brighter_trades.log_user_in_out(user_name=username, cmd='login', password=password)
if success:
# Store the user_name in the session
session['user'] = username
flash('Login successful!')
else:
flash('Invalid user_name or password.')
return redirect('/')
@app.route('/signup_submit', methods=['POST'])
def signup_submit():
email = request.form.get('email')
username = request.form.get('user_name')
password = request.form.get('password')
# Validate email format
try:
validate_email(email)
except EmailNotValidError as e:
flash(message=f"Invalid email format: {e}")
return redirect('/signup') # Redirect back to signup page
# Validate user_name and password
if not username or not password:
flash(message="Missing user_name or password")
return redirect('/signup') # Redirect back to signup page
# Create a new user
success = brighter_trades.create_new_user(email=email, username=username, password=password)
if success:
session['user'] = username
flash(message="Signup successful! You are now logged in.")
return redirect('/') # Redirect to the main page
else:
flash(message="An error has occurred during the signup process.")
return redirect('/signup') # Redirect back to signup page
@app.route('/api/indicator_init', methods=['POST', 'GET'])
def indicator_init():
"""
Initializes the indicators and returns the data for a given symbol and timeframe.
"""
data = request.get_json()
username = resolve_user_name(data)
if not username:
return jsonify({'error': 'Invalid user name.'}), 400
# Check if the user is logged in
if not brighter_trades.get_user_info(user_name=username, info='Is logged in?'):
return jsonify({'error': 'User is not logged in.'}), 401
# Get the chart view for the user
chart_view = brighter_trades.get_user_info(user_name=username, info='Chart View')
# Get the indicator data
source = {'user_name': username, 'market': chart_view}
data = brighter_trades.get_indicator_data(user_name=username, source=source, start_ts=None, num_results=1000)
# indicators={'EMA 5': {'visible': true, 'type': 'EMA', 'color': 'red' },
# 'vol': {'visible': true, 'type': 'Volume'},'New Indicator': {'visible': true, 'type': 'nothing'}}
return jsonify(data), 200
if __name__ == '__main__':
# Start the strategy execution loop in the background
start_strategy_loop()
logging.info("Strategy execution loop started in background")
socketio.run(app, host='127.0.0.1', port=5002, debug=False, use_reloader=False)