Working and relatively glitch free. Classes implemented in python and javascript. UML class diagram. roughed in for both python and javascript.

This commit is contained in:
Rob 2022-05-16 21:49:46 -03:00
parent 958ad4d4a5
commit f6e1b848d2
8 changed files with 17656 additions and 2632 deletions

File diff suppressed because it is too large Load Diff

68
app.py
View File

@ -1,9 +1,7 @@
import json import json
import os
import sys
from flask import Flask, render_template, request, redirect, jsonify from flask import Flask, render_template, request, redirect, jsonify
from flask_cors import CORS, cross_origin from flask_cors import cross_origin
from binance.enums import * from binance.enums import *
from flask_sock import Sock from flask_sock import Sock
# Handles all server side data # Handles all server side data
@ -41,11 +39,11 @@ def index():
@sock.route('/ws') @sock.route('/ws')
def ws(sock): def ws(sock):
def json_msg_received(msg_obj): def json_msg_received(msg_obj):
if 'message_type' in msg_ob: if 'message_type' in msg_obj:
if msg_ob['message_type'] == 'candle_data': if msg_obj['message_type'] == 'candle_data':
# Send the candle to the BrighterData_obj # Send the candle to the BrighterData_obj
# and forward any returned data to the client. # and forward any returned data to the client.
r_data = bt.app_data.received_cdata(msg_ob['data']) r_data = bt.app_data.received_cdata(msg_obj['data'])
if r_data: if r_data:
resp = { resp = {
"reply": "i_updates", "reply": "i_updates",
@ -53,18 +51,18 @@ def ws(sock):
} }
sock.send(json.dumps(resp)) sock.send(json.dumps(resp))
if msg_ob['message_type'] == 'request': if msg_obj['message_type'] == 'request':
print(msg_ob['req']) print(msg_obj['req'])
print('Request!') print('Request!')
if msg_ob['message_type'] == 'reply': if msg_obj['message_type'] == 'reply':
print(msg_ob['rep']) print(msg_obj['rep'])
print('Reply') print('Reply')
if msg_ob['message_type'] == 'new_signal': if msg_obj['message_type'] == 'new_signal':
# Send the data to the BrighterData_obj # Send the data to the BrighterData_obj
# and forward any returned data to the client. # and forward any returned data to the client.
r_data = bt.app_data.received_new_signal(msg_ob['data']) r_data = bt.app_data.received_new_signal(msg_obj['data'])
if r_data: if r_data:
resp = { resp = {
"reply": "i_updates", "reply": "i_updates",
@ -81,8 +79,8 @@ def ws(sock):
# If in json format the message gets converted into a dictionary # If in json format the message gets converted into a dictionary
# otherwise it is handled as a status signal from the client # otherwise it is handled as a status signal from the client
try: try:
msg_ob = json.loads(msg) msg_obj = json.loads(msg)
json_msg_received(msg_ob) json_msg_received(msg_obj)
except json.JSONDecodeError: except json.JSONDecodeError:
print(f'Msg received from client: {msg}') print(f'Msg received from client: {msg}')
@ -112,10 +110,10 @@ def settings():
setting = request.form['setting'] setting = request.form['setting']
if setting == 'interval': if setting == 'interval':
interval_state = request.form['timeframe'] interval_state = request.form['timeframe']
bt.app_data.chart_configuration['chart_interval'] = interval_state bt.app_data.config.chart_interval = interval_state
elif setting == 'trading_pair': elif setting == 'trading_pair':
trading_pair = request.form['trading_pair'] trading_pair = request.form['trading_pair']
bt.app_data.chart_configuration['trading_pair'] = trading_pair bt.app_data.config.trading_pair = trading_pair
elif setting == 'toggle_indicator': elif setting == 'toggle_indicator':
# Get a list of indicators to enable # Get a list of indicators to enable
enabled_indicators = [] enabled_indicators = []
@ -123,13 +121,13 @@ def settings():
if request.form[i] == 'indicator': if request.form[i] == 'indicator':
enabled_indicators.append(i) enabled_indicators.append(i)
# Set visibility for all indicators according to <enabled_indicators> # Set visibility for all indicators according to <enabled_indicators>
for indctr in bt.app_data.indicator_list: for indctr in bt.app_data.indicators.indicator_list:
if (indctr in enabled_indicators): if indctr in enabled_indicators:
bt.app_data.indicator_list[indctr]['visible'] = True bt.app_data.indicators.indicator_list[indctr]['visible'] = True
else: else:
bt.app_data.indicator_list[indctr]['visible'] = False bt.app_data.indicators.indicator_list[indctr]['visible'] = False
# Redirect without reloading history # Redirect without reloading history
bt.app_data.config_and_states('save') bt.app_data.config.config_and_states('save')
return redirect('/') return redirect('/')
elif setting == 'edit_indicator': elif setting == 'edit_indicator':
@ -143,16 +141,16 @@ def settings():
if attributes[a].isdigit(): if attributes[a].isdigit():
attributes[a] = int(attributes[a]) attributes[a] = int(attributes[a])
# if visible is unchecked it doesn't get sent by the form # if visible is unchecked it doesn't get sent by the form
if not 'visible' in attributes: if 'visible' not in attributes:
attributes.update({'visible': False}) attributes.update({'visible': False})
# Set the data in indicators according to <attributes> # Set the data in indicators according to <attributes>
bt.app_data.indicator_list[indicator] = attributes bt.app_data.indicators.indicator_list[indicator] = attributes
if 'delete' in request.form: if 'delete' in request.form:
indicator = request.form['delete'] indicator = request.form['delete']
del bt.app_data.indicator_list[indicator] del bt.app_data.indicators.indicator_list[indicator]
# Redirect without reloading history # Redirect without reloading history
bt.app_data.config_and_states('save') bt.app_data.config.config_and_states('save')
return redirect('/') return redirect('/')
elif setting == 'new_indicator': elif setting == 'new_indicator':
@ -172,34 +170,34 @@ def settings():
if value.isdigit(): if value.isdigit():
value = int(value) value = int(value)
properties[key] = value properties[key] = value
bt.app_data.create_indicator(name=indcr, type=indtyp, properties=properties) bt.app_data.indicators.create_indicator(name=indcr, itype=indtyp, properties=properties)
else: else:
print('ERROR SETTING VALUE') print('ERROR SETTING VALUE')
print(f'The string received by the server was: /n{request.form}') print(f'The string received by the server was: /n{request.form}')
bt.app_data.config_and_states('save') bt.app_data.config.config_and_states('save')
bt.app_data.set_candle_history() bt.app_data.candles.set_candle_history()
return redirect('/') return redirect('/')
@app.route('/history') @app.route('/history')
@cross_origin(origin='localhost', headers=['Content- Type', 'Authorization']) @cross_origin(origin='localhost', headers=['Content- Type', 'Authorization'])
def history(): def history():
symbol = bt.app_data.chart_configuration['trading_pair'] symbol = bt.app_data.config.trading_pair
interval = bt.app_data.chart_configuration['chart_interval'] interval = bt.app_data.config.chart_interval
return jsonify(bt.app_data.get_candle_history(symbol, interval, 1000)) return jsonify(bt.app_data.candles.get_candle_history(symbol, interval, 1000))
@app.route('/saved_data') @app.route('/saved_data')
@cross_origin(origin='localhost', headers=['Content- Type', 'Authorization']) @cross_origin(origin='localhost', headers=['Content- Type', 'Authorization'])
def saved_data(): def saved_data():
return jsonify(bt.app_data.indicator_list) return jsonify(bt.app_data.indicators.indicator_list)
@app.route('/indicator_init') @app.route('/indicator_init')
@cross_origin(origin='localhost', headers=['Content- Type', 'Authorization']) @cross_origin(origin='localhost', headers=['Content- Type', 'Authorization'])
def indicator_init(): def indicator_init():
symbol = bt.app_data.chart_configuration['trading_pair'] symbol = bt.app_data.config.trading_pair
interval = bt.app_data.chart_configuration['chart_interval'] interval = bt.app_data.config.chart_interval
d = bt.app_data.get_indicator_data(symbol, interval, 1000) d = bt.app_data.indicators.get_indicator_data(symbol, interval, 1000)
return jsonify(d) return jsonify(d)

255
candles.py Normal file
View File

@ -0,0 +1,255 @@
import datetime
import csv
class Candles:
def __init__(self, config, client):
# Keep a reference to the exchange client.
self.client = client
# The maximum amount of data to load into memory at one time.
self.max_data_loaded = config.max_data_loaded
# A reference to the app configuration
self.config = config
# The entire loaded candle history
self.candlesticks = []
# List of dictionaries of timestamped high, low, and closing values
self.latest_high_values = []
self.latest_low_values = []
self.latest_close_values = []
# Values of the last candle received
self.last_candle = None
# List of dictionaries of timestamped volume values
self.latest_vol = []
# Set the instance variable of candlesticks, latest_close_values, high, low, closing, volume, and last_candle
self.set_candle_history()
def set_new_candle(self, cdata):
self.last_candle = cdata
self.latest_close_values.append({'time': cdata['time'], 'close': cdata['close']})
self.latest_high_values.append({'time': cdata['time'], 'high': cdata['high']})
self.latest_low_values.append({'time': cdata['time'], 'low': cdata['low']})
self.latest_vol.append({'time': cdata['time'], 'value': cdata['vol']})
def load_candle_history(self, symbol, interval):
""" Retrieve candlestick history from a file and append it with
more recent exchange data while updating the file record.
This method only get called if the <symbol> data is requested.
This is to avoid maintaining irrelevant data files."""
start_datetime = datetime.datetime(2017, 1, 1)
# Create a filename from the function parameters.
# Format is symbol_interval_start_date: example - 'BTCUSDT_15m_2017-01-01'
file_name = f'{symbol}_{interval}_{start_datetime.date()}'
# List of price data. <Open_time>,<Open>,<High>,<Low>,<Close>,
# <Ignore><Close_time><Ignore>
# <Number_of_bisic_data>,<Ignore,Ignore,Ignore>
candlesticks = []
try:
# Populate <candlesticks> from <file_name> if it exists.
print(f'Attempting to open: {file_name}')
with open(file_name, 'r') as file:
reader = csv.reader(file, delimiter=',')
# Load the data here
for row in reader:
candlesticks.append(row)
print('File loaded')
# Open <file_name> for appending
file = open(file_name, 'a', newline='')
candlestick_writer = csv.writer(file, delimiter=',')
except IOError:
# If the file doesn't exist it must be created.
print(f'{file_name} not found: Creating the file')
# Open <file_name> for writing
file = open(file_name, 'w', newline='')
candlestick_writer = csv.writer(file, delimiter=',')
# If no candlesticks were loaded from file. Set a date to start loading from in the
# variable <last_candle_stamp> with a default value stored in <start_datetime>.
if not candlesticks:
last_candle_stamp = start_datetime.timestamp() * 1000
else:
# Set <last_candle_stamp> with the timestamp of the last candles on file
last_candle_stamp = candlesticks[-1][0]
# Request any missing candlestick data from the exchange
recent_candlesticks = self.client.get_historical_klines(symbol, interval, start_str=int(last_candle_stamp))
# Discard the first row of candlestick data as it will be a duplicate***DOUBLE CHECK THIS
recent_candlesticks.pop(0)
# Append the candlestick list and the file
for candlestick in recent_candlesticks:
candlesticks.append(candlestick)
candlestick_writer.writerow(candlestick)
# Close the file and return the entire candlestick history
file.close()
return candlesticks
def set_candle_history(self, symbol=None, interval=None, max_data_loaded=None):
if not max_data_loaded:
max_data_loaded = self.max_data_loaded
if not symbol:
symbol = self.config.trading_pair
if not interval:
interval = self.config.chart_interval
if self.candlesticks:
print(f'set_candle_history(): Reloading candle data for :{interval}')
else:
print('set_candle_history(): Loading candle data')
# Load candles from file
cdata = self.load_candle_history(symbol, interval)
# Trim the beginning of the returned list to size of max_data_loaded of less
if len(cdata) < max_data_loaded:
max_data_loaded = len(cdata)
self.candlesticks = cdata[-max_data_loaded:]
# Set an instance dictionary of timestamped high, low, closing values
self.set_latest_high_values()
self.set_latest_low_values()
self.set_latest_close_values()
# Extract the volume data from self.candlesticks and store it in self.latest_vol
self.set_latest_vol()
# Set an instance reference of the last candle
self.last_candle = self.convert_candle(self.candlesticks[-1])
print('set_candle_history(): Candle data Loaded')
return
def set_latest_vol(self):
# Extracts a list of volume values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_vol = []
last_clp = 0
for data in self.candlesticks:
clp = int(float(data[4]))
if clp < last_clp:
color = 'rgba(255,82,82, 0.8)' # red
else:
color = 'rgba(0, 150, 136, 0.8)' # green
vol_data = {
"time": int(data[0]) / 1000,
"value": int(float(data[5])),
"color": color
}
last_clp = clp
latest_vol.append(vol_data)
self.latest_vol = latest_vol
return
def get_latest_vol(self, num_record=500):
# Returns the latest closing values
if self.latest_vol:
if len(self.latest_vol) < num_record:
print('Warning: get_latest_vol() - Requested too more records then available')
num_record = len(self.latest_vol)
return self.latest_vol[-num_record:]
else:
raise ValueError('Warning: get_latest_vol(): Values are not set')
def set_latest_high_values(self):
# Extracts a list of close values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_high_values = []
for data in self.candlesticks:
high_data = {
"time": int(data[0]) / 1000,
"high": data[2]
}
latest_high_values.append(high_data)
self.latest_high_values = latest_high_values
return
def get_latest_high_values(self, num_record=500):
# Returns the latest closing values
if self.latest_high_values:
if len(self.latest_high_values) < num_record:
print('Warning: latest_high_values() - Requested too more records then available')
num_record = len(self.latest_high_values)
return self.latest_high_values[-num_record:]
else:
raise ValueError('Warning: latest_high_values(): Values are not set')
def set_latest_low_values(self):
# Extracts a list of close values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_low_values = []
for data in self.candlesticks:
low_data = {
"time": int(data[0]) / 1000,
"low": data[3]
}
latest_low_values.append(low_data)
self.latest_low_values = latest_low_values
return
def get_latest_low_values(self, num_record=500):
# Returns the latest closing values
if self.latest_low_values:
if len(self.latest_low_values) < num_record:
print('Warning: latest_low_values() - Requested too more records then available')
num_record = len(self.latest_low_values)
return self.latest_low_values[-num_record:]
else:
raise ValueError('Warning: latest_low_values(): Values are not set')
def set_latest_close_values(self):
# Extracts just the timestamped close values from all the loaded candlestick
# data and store it in the class instance as a dictionary.
latest_close_values = []
for data in self.candlesticks:
close_data = {
"time": int(data[0]) / 1000,
"close": data[4]
}
latest_close_values.append(close_data)
self.latest_close_values = latest_close_values
return
def get_latest_close_values(self, num_record=500):
# Returns the latest closing values from the class instance.
if self.latest_close_values:
if len(self.latest_close_values) < num_record:
print('Warning: get_latest_close_values() - Requested too more records then available')
num_record = len(self.latest_close_values)
return self.latest_close_values[-num_record:]
else:
raise ValueError('Warning: get_latest_close_values(): Values are not set')
@staticmethod
def convert_candle(candle):
# Converts the binance candle format to what lightweight charts expects.
candlestick = {
"time": int(candle[0]) / 1000,
"open": candle[1],
"high": candle[2],
"low": candle[3],
"close": candle[4]
}
return candlestick
def get_candle_history(self, symbol, interval, num_records):
# Returns a specified number of candle records from memory in the lightweight
# charts format.
if len(self.candlesticks) < num_records:
print('Warning: get_candle_history() Requested more records then available')
num_records = len(self.candlesticks)
# Drop everything but the requested number of records
candlesticks = self.candlesticks[-num_records:]
# Reformat relevant candlestick data into a list of python dictionary objects.
# Binance stores timestamps in milliseconds but lightweight charts doesn't,
# so it gets divided by 1000
processed_candlesticks = []
for candle in candlesticks:
candlestick = self.convert_candle(candle)
processed_candlesticks.append(candlestick)
# Return a list of candlestick objects
return processed_candlesticks
def get_volume(self, i_type, num_results=800):
r_data = self.get_latest_vol()
r_data = r_data[-num_results:]
return {"type": i_type, "data": r_data}

View File

@ -1,100 +0,0 @@
chart_configuration:
chart_interval: 4h
trading_pair: BTCUSDT
indicator_list:
ATR:
color: '#1a9b6f'
period: 100
type: ATR
value: 0
visible: true
Bolenger:
color_1: '#5ad858'
color_2: '#64d2f7'
color_3: '#5ad858'
devdn: 2
devup: 2
ma: 1
period: 20
type: BOLBands
value: 0
value1: '38642.16'
value2: '38641.24'
value3: '38639.63'
visible: true
EMA 100:
color: '#5c5aee'
period: 100
type: EMA
value: 0
visible: true
EMA 50:
color: '#464e3f'
period: 50
type: EMA
value: 0
visible: true
Linear Reg 100:
color: '#236eb1'
period: 100
type: LREG
value: '38139.51'
visible: true
MACD:
color_1: '#50d617'
color_2: '#94f657'
fast_p: 12
hist: 0
macd: '-2540.72'
signal: '-1775.26'
signal_p: 9
slow_p: 26
type: MACD
value: 0
visible: 'False'
New Indicator:
color: '#d5ed5e'
period: 20
type: RSI
value: 0
visible: true
New rsi:
color: '#8e257b'
period: 20
type: RSI
value: '48.96'
visible: true
RSI 14:
color: '#1b63bf'
period: 14
type: RSI
value: 0
visible: true
RSI 8:
color: '#2afd40'
period: 8
type: RSI
value: 0
visible: true
RSI_21:
color: '#ea3ea2'
period: 21
type: RSI
value: '29.78'
visible: 'True'
SMA 200:
color: '#1d545c'
period: 200
type: SMA
value: 0
visible: true
SMA 21:
color: '#0decc9'
period: 21
type: SMA
value: 0
visible: true
Volume:
type: Volume
value: 0
visible: true

75
config_states.py Normal file
View File

@ -0,0 +1,75 @@
from binance.enums import *
import yaml
class Configuration:
def __init__(self):
# ************** Default values**************
# The title of our program.
self.application_title = 'BrighterTrades'
# The maximum number of candles to store in memory.
self.max_data_loaded = 1000
# The default values for the main chart.
self.chart_interval = KLINE_INTERVAL_15MINUTE
self.trading_pair = 'BTCUSDT'
# The name of the file that stores saved_data
self.config_FN = 'config.yml'
# A list of all the indicators available. This is injected later.
self.indicator_list = None
# The data that will be saved and loaded from file .
self.saved_data = None
def set_indicator_list(self, list):
self.indicator_list = list
def config_and_states(self, cmd):
"""Loads or saves configurable data to the file set in self.config_FN"""
# The data stored and retrieved from file session.
self.saved_data = {
'indicator_list': self.indicator_list,
'config': {'chart_interval': self.chart_interval, 'trading_pair': self.trading_pair}
}
def set_loaded_values():
# Sets the values in the saved_data object.
self.indicator_list = self.saved_data['indicator_list']
self.chart_interval = self.saved_data['config']['chart_interval']
self.trading_pair = self.saved_data['config']['trading_pair']
def load_configuration(filepath):
"""load file data"""
with open(filepath, "r") as file_descriptor:
data = yaml.safe_load(file_descriptor)
return data
def save_configuration(filepath, data):
"""Saves file data"""
with open(filepath, "w") as file_descriptor:
yaml.dump(data, file_descriptor)
if cmd == 'load':
# If load_configuration() finds a file it overwrites
# the saved_data object otherwise it creates a new file
# with the defaults contained in saved_data>
try:
# If file exist load the values.
self.saved_data = load_configuration(self.config_FN)
set_loaded_values()
except IOError:
# If file doesn't exist create a file and save the default values.
save_configuration(self.config_FN, self.saved_data)
elif cmd == 'save':
try:
# Write saved_data to the file.
save_configuration(self.config_FN, self.saved_data)
except IOError:
raise ValueError("save_configuration(): Couldn't save the file.")
else:
raise ValueError('save_configuration(): Invalid command received.')

791
data.py
View File

@ -1,15 +1,10 @@
import csv from binance.client import Client
import datetime
import sys
import random
import numpy as np
import talib
import config import config
from binance.client import Client from candles import Candles
from binance.enums import * from config_states import Configuration
import yaml from exchange_info import ExchangeInfo
from indicators import Indicators
class BrighterData: class BrighterData:
@ -18,782 +13,68 @@ class BrighterData:
# Initialise a connection to the Binance client API # Initialise a connection to the Binance client API
self.client = Client(config.API_KEY, config.API_SECRET) self.client = Client(config.API_KEY, config.API_SECRET)
# The title of our program # Configuration and settings for the user interface and charts
self.application_title = 'BrighterTrades' self.config = Configuration()
# Call a static method from indicators that fills in a default list of indicators in config.
# Settings for the main chart on our UI self.config.set_indicator_list(Indicators.get_indicator_defaults())
self.chart_configuration = {
'chart_interval': KLINE_INTERVAL_15MINUTE,
'trading_pair': 'BTCUSDT',
}
# The maximum number of candles to store in memory
self.max_data_loaded = 1000
# List of all available indicator types
self.indicator_types = {}
# List of all available indicators
self.indicator_list = None
# Add default indicators and their default values to self.indicator_list
self.set_indicator_defaults()
# Dictionary of exchange and account data
self.exchange_data = {}
# Set the values in self.exchange_data from information retrieved from exchange.
self.set_exchange_data()
# The name of the file that stores saved_data
self.config_FN = 'config.yml'
# Load any saved data from file # Load any saved data from file
self.config_and_states('load') self.config.config_and_states('load')
# The entire loaded candle history # Object that maintains candlestick and price data.
self.candlesticks = [] self.candles = Candles(self.config, self.client)
# List of dictionaries of timestamped high, low, and closing values
self.latest_high_values = []
self.latest_low_values = []
self.latest_close_values = []
# Values of the last candle received
self.last_candle = None
# List of dictionaries of timestamped volume values
self.latest_vol = []
# Set the instance variable of candlesticks, latest_close_values, high, low, closing, volume, and last_candle # Object that interacts with and maintains data from available indicators
self.set_candle_history() self.indicators = Indicators(self.candles)
# A list of values to use with bolenger bands # Object that maintains exchange and account data
self.bb_ma_val = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4, 'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8} self.exchange_info = ExchangeInfo(self.client)
def get_js_init_data(self): def get_js_init_data(self):
"""Returns a JSON object of initialization data """Returns a JSON object of initialization data
for the javascript in the rendered HTML""" for the javascript in the rendered HTML"""
js_data = {'i_types': self.indicator_types, js_data = {'i_types': self.indicators.indicator_types,
'indicators': self.indicator_list, 'indicators': self.indicators.indicator_list,
'interval': self.chart_configuration['chart_interval'], 'interval': self.config.chart_interval,
'trading_pair': self.chart_configuration['trading_pair']} 'trading_pair': self.config.trading_pair}
return js_data return js_data
def config_and_states(self, cmd):
"""Loads or saves configurable data to the file set in self.config_FN"""
# Application configuration and object states
saved_data = {
'indicator_list': self.indicator_list,
'chart_configuration': self.chart_configuration
}
def set_loaded_values():
self.indicator_list = saved_data['indicator_list']
self.chart_configuration = saved_data['chart_configuration']
def load_configuration(filepath):
"""load file data"""
with open(filepath, "r") as file_descriptor:
data = yaml.safe_load(file_descriptor)
return data
def save_configuration(filepath, data):
"""Saves file data"""
with open(filepath, "w") as file_descriptor:
yaml.dump(data, file_descriptor)
if cmd == 'load':
# If load_configuration() finds a file it overwrites
# the saved_data object otherwise it creates a new file
# with the defaults contained in saved_data>
try:
saved_data = load_configuration(self.config_FN)
set_loaded_values()
except IOError:
save_configuration(self.config_FN, saved_data)
elif cmd == 'save':
try:
save_configuration(self.config_FN, saved_data)
except IOError:
raise ValueError("Couldn't save the file")
else:
raise ValueError('Invalid command received')
def load_candle_history(self, symbol, interval):
""" Retrieve candlestick history from a file and append it with
more recent exchange data while updating the file record.
This method only get called if the <symbol> data is requested.
This is to avoid maintaining irrelevant data files."""
start_datetime = datetime.datetime(2017, 1, 1)
# Create a filename from the function parameters.
# Format is symbol_interval_start_date: example - 'BTCUSDT_15m_2017-01-01'
file_name = f'{symbol}_{interval}_{start_datetime.date()}'
# List of price data. <Open_time>,<Open>,<High>,<Low>,<Close>,
# <Ignore><Close_time><Ignore>
# <Number_of_bisic_data>,<Ignore,Ignore,Ignore>
candlesticks = []
try:
# Populate <candlesticks> from <file_name> if it exists.
print(f'Attempting to open: {file_name}')
with open(file_name, 'r') as file:
reader = csv.reader(file, delimiter=',')
# Load the data here
for row in reader:
candlesticks.append(row)
print('File loaded')
# Open <file_name> for appending
file = open(file_name, 'a', newline='')
candlestick_writer = csv.writer(file, delimiter=',')
except IOError:
# If the file doesn't exist it must be created.
print(f'{file_name} not found: Creating the file')
# Open <file_name> for writing
file = open(file_name, 'w', newline='')
candlestick_writer = csv.writer(file, delimiter=',')
# If no candlesticks were loaded from file. Set a date to start loading from in the
# variable <last_candle_stamp> with a default value stored in <start_datetime>.
if not candlesticks:
last_candle_stamp = start_datetime.timestamp() * 1000
else:
# Set <last_candle_stamp> with the timestamp of the last candles on file
last_candle_stamp = candlesticks[-1][0]
# Request any missing candlestick data from the exchange
recent_candlesticks = self.client.get_historical_klines(symbol, interval, start_str=int(last_candle_stamp))
# Discard the first row of candlestick data as it will be a duplicate***DOUBLE CHECK THIS
recent_candlesticks.pop(0)
# Append the candlestick list and the file
for candlestick in recent_candlesticks:
candlesticks.append(candlestick)
candlestick_writer.writerow(candlestick)
# Close the file and return the entire candlestick history
file.close()
return candlesticks
def set_latest_vol(self):
# Extracts a list of volume values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_vol = []
last_clp = 0
for data in self.candlesticks:
clp = int(float(data[4]))
if clp < last_clp:
color = 'rgba(255,82,82, 0.8)' # red
else:
color = 'rgba(0, 150, 136, 0.8)' # green
vol_data = {
"time": int(data[0]) / 1000,
"value": int(float(data[5])),
"color": color
}
last_clp = clp
latest_vol.append(vol_data)
self.latest_vol = latest_vol
return
def get_latest_vol(self, num_record=500):
# Returns the latest closing values
if self.latest_vol:
if len(self.latest_vol) < num_record:
print('Warning: get_latest_vol() - Requested too more records then available')
num_record = len(self.latest_vol)
return self.latest_vol[-num_record:]
else:
raise ValueError('Warning: get_latest_vol(): Values are not set')
def set_latest_high_values(self):
# Extracts a list of close values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_high_values = []
for data in self.candlesticks:
high_data = {
"time": int(data[0]) / 1000,
"high": data[2]
}
latest_high_values.append(high_data)
self.latest_high_values = latest_high_values
return
def get_latest_high_values(self, num_record=500):
# Returns the latest closing values
if self.latest_high_values:
if len(self.latest_high_values) < num_record:
print('Warning: latest_high_values() - Requested too more records then available')
num_record = len(self.latest_high_values)
return self.latest_high_values[-num_record:]
else:
raise ValueError('Warning: latest_high_values(): Values are not set')
def set_latest_low_values(self):
# Extracts a list of close values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_low_values = []
for data in self.candlesticks:
low_data = {
"time": int(data[0]) / 1000,
"low": data[3]
}
latest_low_values.append(low_data)
self.latest_low_values = latest_low_values
return
def get_latest_low_values(self, num_record=500):
# Returns the latest closing values
if self.latest_low_values:
if len(self.latest_low_values) < num_record:
print('Warning: latest_low_values() - Requested too more records then available')
num_record = len(self.latest_low_values)
return self.latest_low_values[-num_record:]
else:
raise ValueError('Warning: latest_low_values(): Values are not set')
def set_latest_close_values(self):
# Extracts a list of close values from all the loaded candlestick
# data and store it in a dictionary keyed to timestamp of measurement.
latest_close_values = []
for data in self.candlesticks:
close_data = {
"time": int(data[0]) / 1000,
"close": data[4]
}
latest_close_values.append(close_data)
self.latest_close_values = latest_close_values
return
def get_latest_close_values(self, num_record=500):
# Returns the latest closing values
if self.latest_close_values:
if len(self.latest_close_values) < num_record:
print('Warning: get_latest_close_values() - Requested too more records then available')
num_record = len(self.latest_close_values)
return self.latest_close_values[-num_record:]
else:
raise ValueError('Warning: get_latest_close_values(): Values are not set')
def set_candle_history(self, symbol=None, interval=None, max_data_loaded=None):
if not max_data_loaded:
max_data_loaded = self.max_data_loaded
if not symbol:
symbol = self.chart_configuration['trading_pair']
if not interval:
interval = self.chart_configuration['chart_interval']
if self.candlesticks:
print('set_candle_history(): Reloading candle data')
else:
print('set_candle_history(): Loading candle data')
# Load candles from file
cdata = self.load_candle_history(symbol, interval)
# Trim the beginning of the returned list to size of max_data_loaded of less
if len(cdata) < max_data_loaded:
max_data_loaded = len(cdata)
self.candlesticks = cdata[-max_data_loaded:]
# Set an instance dictionary of timestamped high, low, closing values
self.set_latest_high_values()
self.set_latest_low_values()
self.set_latest_close_values()
# Extract the volume data from self.candlesticks and store it in self.latest_vol
self.set_latest_vol()
# Set an instance reference of the last candle
self.last_candle = self.convert_candle(self.candlesticks[-1])
print('set_candle_history(): Candle data Loaded')
return
def convert_candle(self, candle):
candlestick = {
"time": int(candle[0]) / 1000,
"open": candle[1],
"high": candle[2],
"low": candle[3],
"close": candle[4]
}
return candlestick
def get_candle_history(self, symbol, interval, num_records):
if len(self.candlesticks) < num_records:
print('Warning: get_candle_history() Requested more records then available')
num_records = len(self.candlesticks)
# Drop everything but the requested number of records
candlesticks = self.candlesticks[-num_records:]
# Reformat relevant candlestick data into a list of python dictionary objects.
# Binance stores timestamps in milliseconds but lightweight charts doesn't,
# so it gets divided by 1000
processed_candlesticks = []
for data in candlesticks:
candlestick = {
"time": int(data[0]) / 1000,
"open": data[1],
"high": data[2],
"low": data[3],
"close": data[4]
}
processed_candlesticks.append(candlestick)
# Return a list of candlestick objects
return processed_candlesticks
# list enabled indicators
def get_enabled_indicators(self):
""" Loop through all indicators and make a list of indicators marked visible """
enabled_indicators = []
i_list = self.get_indicator_list()
for indctr in i_list:
if i_list[indctr]['visible']:
enabled_indicators.append(indctr)
return enabled_indicators
def set_indicator_defaults(self):
"""Set the default settings for each indicator"""
self.indicator_types = {'simple_indicators': ['RSI', 'SMA', 'EMA', 'LREG'],
'other': ['Volume', 'BOLBands', 'MACD', 'ATR']}
self.indicator_list = {
'SMA 21': {'type': 'SMA', 'period': 21, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'SMA 200': {'type': 'SMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2,
'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': 0, 'value1': '38691.58',
'value2': '38552.36',
'value3': '38413.14', 'visible': 'True'},
'vol': {'type': 'Volume', 'visible': True, 'value': 0}
}
return
def get_indicator_list(self):
# Returns a list of all the indicator object in this class instance
if not self.indicator_list:
raise ValueError('get_indicator_list(): No indicators in the list')
return self.indicator_list
def set_exchange_data(self):
# Pull all balances from client while discarding assets with zero balance
account = self.client.futures_coin_account_balance()
self.exchange_data['balances'] = [asset for asset in account if float(asset['balance']) > 0]
# Pull all available symbols from client
exchange_info = self.client.get_exchange_info()
self.exchange_data['symbols'] = exchange_info['symbols']
# Available intervals
self.exchange_data['intervals'] = (
KLINE_INTERVAL_1MINUTE, KLINE_INTERVAL_3MINUTE,
KLINE_INTERVAL_5MINUTE, KLINE_INTERVAL_15MINUTE,
KLINE_INTERVAL_30MINUTE, KLINE_INTERVAL_1HOUR,
KLINE_INTERVAL_2HOUR, KLINE_INTERVAL_4HOUR,
KLINE_INTERVAL_6HOUR, KLINE_INTERVAL_8HOUR,
KLINE_INTERVAL_12HOUR, KLINE_INTERVAL_1DAY,
KLINE_INTERVAL_3DAY, KLINE_INTERVAL_1WEEK,
KLINE_INTERVAL_1MONTH
)
def get_rendered_data(self): def get_rendered_data(self):
""" """
Data to be rendered in the HTML Data to be rendered in the HTML
""" """
rd = {} rd = {}
rd['title'] = self.application_title # Title of the page rd['title'] = self.config.application_title # Title of the page
rd['my_balances'] = self.exchange_data['balances'] # Balances on the exchange rd['my_balances'] = self.exchange_info.balances # Balances on the exchange
rd['symbols'] = self.exchange_data['symbols'] # Symbols information from the exchange rd['symbols'] = self.exchange_info.symbols # Symbols information from the exchange
rd['intervals'] = self.exchange_data['intervals'] # Time candle time intervals available to stream rd['intervals'] = self.exchange_info.intervals # Time candle time intervals available to stream
rd['chart_interval'] = self.chart_configuration['chart_interval'] # The charts current interval setting rd['chart_interval'] = self.config.chart_interval # The charts current interval setting
rd['indicator_types'] = self.indicator_types # All the types indicators Available rd['indicator_types'] = self.indicators.indicator_types # All the types indicators Available
rd['indicator_list'] = self.get_indicator_list() # indicators available rd['indicator_list'] = self.indicators.get_indicator_list() # indicators available
rd['enabled_indicators'] = self.get_enabled_indicators() # list of indicators that are enabled rd['enabled_indicators'] = self.indicators.get_enabled_indicators() # list of indicators that are enabled
rd['ma_vals'] = self.bb_ma_val # A list of acceptable values to use with bolenger band creation rd['ma_vals'] = self.indicators.bb_ma_val # A list of acceptable values to use with bolenger band creation
return rd return rd
def get_indicator_data(self, symbol=None, interval=None, num_results=100):
# Loop through all the indicators. If enabled, run the appropriate
# update function. Return all the results as a dictionary object.
if not interval:
interval = self.chart_configuration['chart_interval']
if not symbol:
symbol = self.chart_configuration['trading_pair']
# Get a list of indicator objects and a list of enabled indicators names.
i_list = self.get_indicator_list()
enabled_i = self.get_enabled_indicators()
result = {}
# Loop through all indicator objects in i_list
for each_i in i_list:
# If the indicator's not enabled skip to next each_i
if each_i not in enabled_i:
continue
i_type = i_list[each_i]['type']
# If it is a simple indicator.
if i_type in self.indicator_types['simple_indicators']:
result[each_i] = self.calculate_simple_indicator(i_type=i_type,
period=i_list[each_i]['period'])
if i_type in self.indicator_types['other']:
if i_type == 'BOLBands':
result[each_i] = self.calculate_bolbands(i_type=i_type,
period=i_list[each_i]['period'],
devup=i_list[each_i]['devup'],
devdn=i_list[each_i]['devdn'],
ma=i_list[each_i]['ma'])
if i_type == 'MACD':
result[each_i] = self.calculate_macd(i_type=i_type,
fast_p=i_list[each_i]['fast_p'],
slow_p=i_list[each_i]['slow_p'],
signal_p=i_list[each_i]['signal_p'])
if i_type == 'Volume':
result[each_i] = self.get_volume(i_type=i_type)
if i_type == 'ATR':
result[each_i] = self.calculate_atr(i_type=i_type,
period=i_list[each_i]['period'])
return result
def get_volume(self, i_type, num_results=800):
r_data = self.get_latest_vol()
r_data = r_data[-num_results:]
return {"type": i_type, "data": r_data}
def calculate_macd(self, i_type, fast_p=12, slow_p=26, signal_p=9, num_results=800):
# These indicators do computations over a period number of price data points.
# So we need at least that plus what ever amount of results needed.
# It seems it needs num_of_nans = (slow_p) - 2) + signal_p
# TODO: slow_p or fast_p which ever is greater should be used in the calc below.
# TODO but i am investigating this.
if fast_p > slow_p:
raise ValueError('Error I think: TODO: calculate_macd()')
num_cv = (slow_p - 2) + signal_p + num_results
closing_data = self.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
macd, signal, hist = talib.MACD(np_real_data, fast_p, slow_p, signal_p)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
macd = macd[-num_results:]
if len(macd) == 1:
print('looks like after slicing')
print(macd)
signal = signal[-num_results:]
hist = hist[-num_results:]
ts = ts[-num_results:]
r_macd = []
r_signal = []
r_hist = []
for each in range(len(macd)):
# filter out nan values
if np.isnan(macd[each]):
continue
r_macd.append({'time': ts[each], 'value': macd[each]})
r_signal.append({'time': ts[each], 'value': signal[each]})
r_hist.append({'time': ts[each], 'value': hist[each]})
r_data = [r_macd, r_signal, r_hist]
return {"type": i_type, "data": r_data}
def calculate_atr(self, i_type, period, num_results=800):
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
num_cv = period + num_results
high_data = self.get_latest_high_values(num_cv)
low_data = self.get_latest_low_values(num_cv)
close_data = self.get_latest_close_values(num_cv)
if len(close_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize 4 arrays to hold a list of h/l/c values and
# a list of timestamps associated with these values
highs = []
lows = []
closes = []
ts = []
# Isolate all the values and timestamps from
# the dictionary objects
for each in high_data:
highs.append(each['high'])
for each in low_data:
lows.append(each['low'])
for each in close_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the lists to a numpy array
np_highs = np.array(highs, dtype=float)
np_lows = np.array(lows, dtype=float)
np_closes = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
atr = talib.ATR(high=np_highs,
low=np_lows,
close=np_closes,
timeperiod=period)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
atr = atr[-num_results:]
ts = ts[-num_results:]
r_data = []
for each in range(len(atr)):
# filter out nan values
if np.isnan(atr[each]):
continue
r_data.append({'time': ts[each], 'value': atr[each]})
return {"type": i_type, "data": r_data}
def calculate_bolbands(self, i_type, period, devup=2, devdn=2, ma=0, num_results=800):
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
# Acceptable values for ma in the talib.BBANDS
# {'SMA':0,'EMA':1, 'WMA' : 2, 'DEMA' : 3, 'TEMA' : 4, 'TRIMA' : 5, 'KAMA' : 6, 'MAMA' : 7, 'T3' : 8}
num_cv = period + num_results
closing_data = self.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
upper, middle, lower = talib.BBANDS(np_real_data,
timeperiod=period,
# number of non-biased standard deviations from the mean
nbdevup=devup,
nbdevdn=devdn,
# Moving average type: simple moving average here
matype=ma)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
i_values_u = upper[-num_results:]
i_values_m = middle[-num_results:]
i_values_l = lower[-num_results:]
ts = ts[-num_results:]
r_data_u = []
r_data_m = []
r_data_l = []
for each in range(len(i_values_u)):
# filter out nan values
if np.isnan(i_values_u[each]):
continue
r_data_u.append({'time': ts[each], 'value': i_values_u[each]})
r_data_m.append({'time': ts[each], 'value': i_values_m[each]})
r_data_l.append({'time': ts[each], 'value': i_values_l[each]})
r_data = [r_data_u, r_data_m, r_data_l]
return {"type": i_type, "data": r_data}
def calculate_simple_indicator(self, i_type, period, num_results=800):
# Valid types of indicators for this function
if i_type not in self.indicator_types['simple_indicators']:
raise ValueError(f'calculate_simple_indicator(): Unknown type: {i_type}')
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
num_cv = period + num_results
closing_data = self.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
if i_type == 'SMA':
i_values = talib.SMA(np_real_data, period)
if i_type == 'RSI':
i_values = talib.RSI(np_real_data, period)
if i_type == 'EMA':
i_values = talib.EMA(np_real_data, period)
if i_type == 'LREG':
i_values = talib.LINEARREG(np_real_data, period)
# Combine the new data with the timestamps
# Warning: The first <period> of rsi values are <NAN>.
# But they should get trimmed off todo get rid of try except *just debuging info
try:
i_values = i_values[-num_results:]
except:
raise ValueError(f'error: {i_type} {i_values}')
ts = ts[-num_results:]
r_data = []
for each in range(len(i_values)):
r_data.append({'time': ts[each], 'value': i_values[each]})
return {"type": i_type, "data": r_data}
def create_indicator(self, name, type, properties):
# Indicator type checking before adding to a dictionary of properties
properties['type'] = type
# Force color and period properties for simple indicators
if type in self.indicator_types['simple_indicators']:
if 'color' not in properties:
properties['color'] = f"#{random.randrange(0x1000000):06x}"
if 'period' not in properties:
properties['period'] = 20
if type in self.indicator_types['other']:
ul_col = f"#{random.randrange(0x1000000):06x}"
if type == 'BOLBands':
if 'period' not in properties:
properties['period'] = 50
if 'color_1' not in properties:
properties['color_1'] = ul_col
if 'color_2' not in properties:
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if 'color_3' not in properties:
properties['color_3'] = ul_col
if 'value1' not in properties:
properties['value1'] = 0
if 'value2' not in properties:
properties['value2'] = 0
if 'value3' not in properties:
properties['value3'] = 0
if 'devup' not in properties:
properties['devup'] = 2
if 'devdn' not in properties:
properties['devdn'] = 2
if 'ma' not in properties:
properties['ma'] = 1
if type == 'MACD':
if 'fast_p' not in properties:
properties['fast_p'] = 12
if 'slow_p' not in properties:
properties['slow_p'] = 26
if 'signal_p' not in properties:
properties['signal_p'] = 9
if 'macd' not in properties:
properties['macd'] = 0
if 'signal' not in properties:
properties['signal'] = 0
if 'hist' not in properties:
properties['hist'] = 0
if 'color_1' not in properties:
properties['color_1'] = f"#{random.randrange(0x1000000):06x}"
if 'color_2' not in properties:
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if type == 'ATR':
if 'period' not in properties:
properties['period'] = 50
if 'color' not in properties:
properties['color'] = f"#{random.randrange(0x1000000):06x}"
# Force value and visibility for all indicators
if 'value' not in properties:
properties['value'] = 0
if 'visible' not in properties:
properties['visible'] = True
# Add the dictionary of properties and values to an instance list
self.indicator_list[name] = properties
return
def received_cdata(self, cdata): def received_cdata(self, cdata):
# If this is the first candle received, # If this is the first candle received,
# then just set last_candle and return. # then just set last_candle and return.
if not self.last_candle: if not self.candles.last_candle:
self.last_candle = cdata self.candles.last_candle = cdata
return return
# If this candle is the same as last candle return nothing to do. # If this candle is the same as last candle return nothing to do.
if cdata['time']: if cdata['time']:
if cdata['time'] == self.last_candle['time']: if cdata['time'] == self.candles.last_candle['time']:
return return
# **** New candle is received *** # New candle is received update the instance data records. And the indicators.
# Update the instance data records. self.candles.set_new_candle(cdata)
self.last_candle = cdata updates = self.indicators.update_indicators()
self.latest_close_values.append({'time': cdata['time'], 'close': cdata['close']})
self.latest_high_values.append({'time': cdata['time'], 'high': cdata['high']})
self.latest_low_values.append({'time': cdata['time'], 'low': cdata['low']})
self.latest_vol.append({'time': cdata['time'], 'value': cdata['vol']})
# Update indicators
updates = self.update_indicators()
return updates return updates
def update_indicators(self):
enabled_indcrs = self.get_enabled_indicators()
indcrs_list = self.get_indicator_list()
# Updated data is collected in this dictionary object
updates = {}
# Loop through all enabled indicators
for indcr in enabled_indcrs:
# Get the type of the indicator being updated
i_type = indcrs_list[indcr]['type']
# Update the indicator with a function appropriate for its kind
# TODO - Check EMA results i see a bit of a sharp turn in the ema line on
# the interface side when reloading the page. It smooths out after a full reload.
if i_type in self.indicator_types['simple_indicators']:
updates[indcr] = self.calculate_simple_indicator(i_type=i_type,
period=indcrs_list[indcr]['period'],
num_results=1)
if i_type == 'BOLBands':
updates[indcr] = self.calculate_bolbands(i_type=i_type,
period=indcrs_list[indcr]['period'],
devup=indcrs_list[indcr]['devup'],
devdn=indcrs_list[indcr]['devdn'],
ma=indcrs_list[indcr]['ma'],
num_results=1)
if i_type == 'MACD':
updates[indcr] = self.calculate_macd(i_type=i_type,
fast_p=indcrs_list[indcr]['fast_p'],
slow_p=indcrs_list[indcr]['slow_p'],
signal_p=indcrs_list[indcr]['signal_p'],
num_results=1)
if i_type == 'ATR':
updates[indcr] = self.calculate_atr(i_type=i_type,
period=indcrs_list[indcr]['period'],
num_results=1)
if i_type == 'Volume':
updates[indcr] = self.get_volume(i_type=i_type,
num_results=1)
return updates
def received_new_signal(self, data): def received_new_signal(self, data):
# Check the data. # Check the data.
if 'sigName' not in data: if 'sigName' not in data:
return 'data.py:received_new_signal() - The new signal has no name. ' return 'data.py:received_new_signal() - The new signal has no name. '
Signal
print(data) print(data)
app_data = BrighterData() app_data = BrighterData()

33
exchange_info.py Normal file
View File

@ -0,0 +1,33 @@
from binance.enums import *
class ExchangeInfo:
def __init__(self, client):
self.client = client
self.intervals = None
self.symbols = None
self.balances = None
# Set the above values from information retrieved from exchange.
self.set_exchange_data()
def set_exchange_data(self):
# Pull all balances from client while discarding assets with zero balance
account = self.client.futures_coin_account_balance()
self.balances = [asset for asset in account if float(asset['balance']) > 0]
# Pull all available symbols from client
exchange_info = self.client.get_exchange_info()
self.symbols = exchange_info['symbols']
# Available intervals
self.intervals = (
KLINE_INTERVAL_1MINUTE, KLINE_INTERVAL_3MINUTE,
KLINE_INTERVAL_5MINUTE, KLINE_INTERVAL_15MINUTE,
KLINE_INTERVAL_30MINUTE, KLINE_INTERVAL_1HOUR,
KLINE_INTERVAL_2HOUR, KLINE_INTERVAL_4HOUR,
KLINE_INTERVAL_6HOUR, KLINE_INTERVAL_8HOUR,
KLINE_INTERVAL_12HOUR, KLINE_INTERVAL_1DAY,
KLINE_INTERVAL_3DAY, KLINE_INTERVAL_1WEEK,
KLINE_INTERVAL_1MONTH
)

409
indicators.py Normal file
View File

@ -0,0 +1,409 @@
import random
import numpy as np
import talib
class Indicators:
def __init__(self, candles):
# Object containing Price and candle data.
self.candles = candles
# List of all available indicator types
self.indicator_types = {}
# List of all available indicators
self.indicator_list = None
# Add default indicators and their default values to self.indicator_list
self.set_indicator_defaults()
# A list of values to use with bolenger bands
self.bb_ma_val = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4, 'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8}
def set_indicator_defaults(self):
self.indicator_list = self.get_indicator_defaults()
# todo: get rid of this they are not needed after utilizing ingheritance in classes instead.
self.indicator_types = {'simple_indicators': ['RSI', 'SMA', 'EMA', 'LREG'],
'other': ['Volume', 'BOLBands', 'MACD', 'ATR']}
@staticmethod
def get_indicator_defaults():
"""Set the default settings for each indicator"""
indicator_list = {
'SMA 21': {'type': 'SMA', 'period': 21, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'SMA 200': {'type': 'SMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2,
'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': 0, 'value1': '38691.58',
'value2': '38552.36',
'value3': '38413.14', 'visible': 'True'},
'vol': {'type': 'Volume', 'visible': True, 'value': 0}
}
return indicator_list
def get_indicator_list(self):
# Returns a list of all the indicator object in this class instance
if not self.indicator_list:
raise ValueError('get_indicator_list(): No indicators in the list')
return self.indicator_list
def get_enabled_indicators(self):
""" Loop through all indicators and return a list of indicators marked visible """
enabled_indicators = []
i_list = self.get_indicator_list()
for indctr in i_list:
if i_list[indctr]['visible']:
enabled_indicators.append(indctr)
return enabled_indicators
def get_indicator_data(self, symbol=None, interval=None, num_results=100):
# Loop through all the indicators. If enabled, run the appropriate
# update function. Return all the results as a dictionary object.
# Get a list of indicator objects and a list of enabled indicators names.
i_list = self.get_indicator_list()
enabled_i = self.get_enabled_indicators()
result = {}
# Loop through all indicator objects in i_list
for each_i in i_list:
# If the indicator's not enabled skip to next each_i
if each_i not in enabled_i:
continue
i_type = i_list[each_i]['type']
# If it is a simple indicator.
if i_type in self.indicator_types['simple_indicators']:
result[each_i] = self.calculate_simple_indicator(i_type=i_type,
period=i_list[each_i]['period'])
if i_type in self.indicator_types['other']:
if i_type == 'BOLBands':
result[each_i] = self.calculate_bolbands(i_type=i_type,
period=i_list[each_i]['period'],
devup=i_list[each_i]['devup'],
devdn=i_list[each_i]['devdn'],
ma=i_list[each_i]['ma'])
if i_type == 'MACD':
result[each_i] = self.calculate_macd(i_type=i_type,
fast_p=i_list[each_i]['fast_p'],
slow_p=i_list[each_i]['slow_p'],
signal_p=i_list[each_i]['signal_p'])
if i_type == 'Volume':
result[each_i] = self.candles.get_volume(i_type=i_type)
if i_type == 'ATR':
result[each_i] = self.calculate_atr(i_type=i_type,
period=i_list[each_i]['period'])
return result
def calculate_macd(self, i_type, fast_p=12, slow_p=26, signal_p=9, num_results=800):
# These indicators do computations over a period number of price data points.
# So we need at least that plus what ever amount of results needed.
# It seems it needs num_of_nans = (slow_p) - 2) + signal_p
# TODO: slow_p or fast_p which ever is greater should be used in the calc below.
# TODO but i am investigating this.
if fast_p > slow_p:
raise ValueError('Error I think: TODO: calculate_macd()')
num_cv = (slow_p - 2) + signal_p + num_results
closing_data = self.candles.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {slow_p}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
macd, signal, hist = talib.MACD(np_real_data, fast_p, slow_p, signal_p)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
macd = macd[-num_results:]
if len(macd) == 1:
print('looks like after slicing')
print(macd)
signal = signal[-num_results:]
hist = hist[-num_results:]
ts = ts[-num_results:]
r_macd = []
r_signal = []
r_hist = []
for each in range(len(macd)):
# filter out nan values
if np.isnan(macd[each]):
continue
r_macd.append({'time': ts[each], 'value': macd[each]})
r_signal.append({'time': ts[each], 'value': signal[each]})
r_hist.append({'time': ts[each], 'value': hist[each]})
r_data = [r_macd, r_signal, r_hist]
return {"type": i_type, "data": r_data}
def calculate_atr(self, i_type, period, num_results=800):
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
num_cv = period + num_results
high_data = self.candles.get_latest_high_values(num_cv)
low_data = self.candles.get_latest_low_values(num_cv)
close_data = self.candles.get_latest_close_values(num_cv)
if len(close_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize 4 arrays to hold a list of h/l/c values and
# a list of timestamps associated with these values
highs = []
lows = []
closes = []
ts = []
# Isolate all the values and timestamps from
# the dictionary objects
for each in high_data:
highs.append(each['high'])
for each in low_data:
lows.append(each['low'])
for each in close_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the lists to a numpy array
np_highs = np.array(highs, dtype=float)
np_lows = np.array(lows, dtype=float)
np_closes = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
atr = talib.ATR(high=np_highs,
low=np_lows,
close=np_closes,
timeperiod=period)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
atr = atr[-num_results:]
ts = ts[-num_results:]
r_data = []
for each in range(len(atr)):
# filter out nan values
if np.isnan(atr[each]):
continue
r_data.append({'time': ts[each], 'value': atr[each]})
return {"type": i_type, "data": r_data}
def calculate_bolbands(self, i_type, period, devup=2, devdn=2, ma=0, num_results=800):
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
# Acceptable values for ma in the talib.BBANDS
# {'SMA':0,'EMA':1, 'WMA' : 2, 'DEMA' : 3, 'TEMA' : 4, 'TRIMA' : 5, 'KAMA' : 6, 'MAMA' : 7, 'T3' : 8}
num_cv = period + num_results
closing_data = self.candles.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
upper, middle, lower = talib.BBANDS(np_real_data,
timeperiod=period,
# number of non-biased standard deviations from the mean
nbdevup=devup,
nbdevdn=devdn,
# Moving average type: simple moving average here
matype=ma)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
i_values_u = upper[-num_results:]
i_values_m = middle[-num_results:]
i_values_l = lower[-num_results:]
ts = ts[-num_results:]
r_data_u = []
r_data_m = []
r_data_l = []
for each in range(len(i_values_u)):
# filter out nan values
if np.isnan(i_values_u[each]):
continue
r_data_u.append({'time': ts[each], 'value': i_values_u[each]})
r_data_m.append({'time': ts[each], 'value': i_values_m[each]})
r_data_l.append({'time': ts[each], 'value': i_values_l[each]})
r_data = [r_data_u, r_data_m, r_data_l]
return {"type": i_type, "data": r_data}
def calculate_simple_indicator(self, i_type, period, num_results=800):
# Valid types of indicators for this function
if i_type not in self.indicator_types['simple_indicators']:
raise ValueError(f'calculate_simple_indicator(): Unknown type: {i_type}')
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
num_cv = period + num_results
closing_data = self.candles.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Could not calculate {i_type} for time period of {period}')
print('Not enough data available')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
i_values = None
if i_type == 'SMA':
i_values = talib.SMA(np_real_data, period)
if i_type == 'RSI':
i_values = talib.RSI(np_real_data, period)
if i_type == 'EMA':
i_values = talib.EMA(np_real_data, period)
if i_type == 'LREG':
i_values = talib.LINEARREG(np_real_data, period)
# Combine the new data with the timestamps
# Warning: The first <period> of rsi values are <NAN>.
# But they should get trimmed off todo get rid of try except *just debuging info
try:
i_values = i_values[-num_results:]
except Exception:
raise ValueError(f'error: {i_type} {i_values}')
ts = ts[-num_results:]
r_data = []
for each in range(len(i_values)):
r_data.append({'time': ts[each], 'value': i_values[each]})
return {"type": i_type, "data": r_data}
def create_indicator(self, name, itype, properties):
# Indicator type checking before adding to a dictionary of properties
properties['type'] = itype
# Force color and period properties for simple indicators
if itype in self.indicator_types['simple_indicators']:
if 'color' not in properties:
properties['color'] = f"#{random.randrange(0x1000000):06x}"
if 'period' not in properties:
properties['period'] = 20
if itype in self.indicator_types['other']:
ul_col = f"#{random.randrange(0x1000000):06x}"
if itype == 'BOLBands':
if 'period' not in properties:
properties['period'] = 50
if 'color_1' not in properties:
properties['color_1'] = ul_col
if 'color_2' not in properties:
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if 'color_3' not in properties:
properties['color_3'] = ul_col
if 'value1' not in properties:
properties['value1'] = 0
if 'value2' not in properties:
properties['value2'] = 0
if 'value3' not in properties:
properties['value3'] = 0
if 'devup' not in properties:
properties['devup'] = 2
if 'devdn' not in properties:
properties['devdn'] = 2
if 'ma' not in properties:
properties['ma'] = 1
if itype == 'MACD':
if 'fast_p' not in properties:
properties['fast_p'] = 12
if 'slow_p' not in properties:
properties['slow_p'] = 26
if 'signal_p' not in properties:
properties['signal_p'] = 9
if 'macd' not in properties:
properties['macd'] = 0
if 'signal' not in properties:
properties['signal'] = 0
if 'hist' not in properties:
properties['hist'] = 0
if 'color_1' not in properties:
properties['color_1'] = f"#{random.randrange(0x1000000):06x}"
if 'color_2' not in properties:
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if itype == 'ATR':
if 'period' not in properties:
properties['period'] = 50
if 'color' not in properties:
properties['color'] = f"#{random.randrange(0x1000000):06x}"
# Force value and visibility for all indicators
if 'value' not in properties:
properties['value'] = 0
if 'visible' not in properties:
properties['visible'] = True
# Add the dictionary of properties and values to an instance list
self.indicator_list[name] = properties
return
def update_indicators(self):
enabled_indcrs = self.get_enabled_indicators()
indcrs_list = self.get_indicator_list()
# Updated data is collected in this dictionary object
updates = {}
# Loop through all enabled indicators
for indcr in enabled_indcrs:
# Get the type of the indicator being updated
i_type = indcrs_list[indcr]['type']
# Update the indicator with a function appropriate for its kind
# TODO - Check EMA results i see a bit of a sharp turn in the ema line on
# the interface side when reloading the page. It smooths out after a full reload.
if i_type in self.indicator_types['simple_indicators']:
updates[indcr] = self.calculate_simple_indicator(i_type=i_type,
period=indcrs_list[indcr]['period'],
num_results=1)
if i_type == 'BOLBands':
updates[indcr] = self.calculate_bolbands(i_type=i_type,
period=indcrs_list[indcr]['period'],
devup=indcrs_list[indcr]['devup'],
devdn=indcrs_list[indcr]['devdn'],
ma=indcrs_list[indcr]['ma'],
num_results=1)
if i_type == 'MACD':
updates[indcr] = self.calculate_macd(i_type=i_type,
fast_p=indcrs_list[indcr]['fast_p'],
slow_p=indcrs_list[indcr]['slow_p'],
signal_p=indcrs_list[indcr]['signal_p'],
num_results=1)
if i_type == 'ATR':
updates[indcr] = self.calculate_atr(i_type=i_type,
period=indcrs_list[indcr]['period'],
num_results=1)
if i_type == 'Volume':
updates[indcr] = self.candles.get_volume(i_type=i_type,
num_results=1)
return updates