800 lines
36 KiB
Python
800 lines
36 KiB
Python
import csv
|
|
import datetime
|
|
import sys
|
|
import random
|
|
|
|
import numpy as np
|
|
import talib
|
|
|
|
import config
|
|
from binance.client import Client
|
|
from binance.enums import *
|
|
import yaml
|
|
|
|
|
|
class BrighterData:
|
|
def __init__(self):
|
|
|
|
# Initialise a connection to the Binance client API
|
|
self.client = Client(config.API_KEY, config.API_SECRET)
|
|
|
|
# The title of our program
|
|
self.application_title = 'BrighterTrades'
|
|
|
|
# Settings for the main chart on our UI
|
|
self.chart_configuration = {
|
|
'chart_interval': KLINE_INTERVAL_15MINUTE,
|
|
'trading_pair': 'BTCUSDT',
|
|
}
|
|
|
|
# The maximum number of candles to store in memory
|
|
self.max_data_loaded = 1000
|
|
|
|
# List of all available indicator types
|
|
self.indicator_types = {}
|
|
# List of all available indicators
|
|
self.indicator_list = None
|
|
# Add default indicators and their default values to self.indicator_list
|
|
self.set_indicator_defaults()
|
|
|
|
# Dictionary of exchange and account data
|
|
self.exchange_data = {}
|
|
# Set the values in self.exchange_data from information retrieved from exchange.
|
|
self.set_exchange_data()
|
|
|
|
# The name of the file that stores saved_data
|
|
self.config_FN = 'config.yml'
|
|
# Load any saved data from file
|
|
self.config_and_states('load')
|
|
|
|
# The entire loaded candle history
|
|
self.candlesticks = []
|
|
# List of dictionaries of timestamped high, low, and closing values
|
|
self.latest_high_values = []
|
|
self.latest_low_values = []
|
|
self.latest_close_values = []
|
|
# Values of the last candle received
|
|
self.last_candle = None
|
|
# List of dictionaries of timestamped volume values
|
|
self.latest_vol = []
|
|
|
|
# Set the instance variable of candlesticks, latest_close_values, high, low, closing, volume, and last_candle
|
|
self.set_candle_history()
|
|
|
|
# A list of values to use with bolenger bands
|
|
self.bb_ma_val = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4, 'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8}
|
|
|
|
def get_js_init_data(self):
|
|
"""Returns a JSON object of initialization data
|
|
for the javascript in the rendered HTML"""
|
|
js_data = {'i_types': self.indicator_types,
|
|
'indicators': self.indicator_list,
|
|
'interval': self.chart_configuration['chart_interval'],
|
|
'trading_pair': self.chart_configuration['trading_pair']}
|
|
return js_data
|
|
|
|
def config_and_states(self, cmd):
|
|
"""Loads or saves configurable data to the file set in self.config_FN"""
|
|
# Application configuration and object states
|
|
saved_data = {
|
|
'indicator_list': self.indicator_list,
|
|
'chart_configuration': self.chart_configuration
|
|
}
|
|
|
|
def set_loaded_values():
|
|
self.indicator_list = saved_data['indicator_list']
|
|
self.chart_configuration = saved_data['chart_configuration']
|
|
|
|
def load_configuration(filepath):
|
|
"""load file data"""
|
|
with open(filepath, "r") as file_descriptor:
|
|
data = yaml.safe_load(file_descriptor)
|
|
return data
|
|
|
|
def save_configuration(filepath, data):
|
|
"""Saves file data"""
|
|
with open(filepath, "w") as file_descriptor:
|
|
yaml.dump(data, file_descriptor)
|
|
|
|
if cmd == 'load':
|
|
# If load_configuration() finds a file it overwrites
|
|
# the saved_data object otherwise it creates a new file
|
|
# with the defaults contained in saved_data>
|
|
try:
|
|
saved_data = load_configuration(self.config_FN)
|
|
set_loaded_values()
|
|
except IOError:
|
|
save_configuration(self.config_FN, saved_data)
|
|
elif cmd == 'save':
|
|
try:
|
|
save_configuration(self.config_FN, saved_data)
|
|
except IOError:
|
|
raise ValueError("Couldn't save the file")
|
|
else:
|
|
raise ValueError('Invalid command received')
|
|
|
|
def load_candle_history(self, symbol, interval):
|
|
""" Retrieve candlestick history from a file and append it with
|
|
more recent exchange data while updating the file record.
|
|
This method only get called if the <symbol> data is requested.
|
|
This is to avoid maintaining irrelevant data files."""
|
|
|
|
start_datetime = datetime.datetime(2017, 1, 1)
|
|
# Create a filename from the function parameters.
|
|
# Format is symbol_interval_start_date: example - 'BTCUSDT_15m_2017-01-01'
|
|
file_name = f'{symbol}_{interval}_{start_datetime.date()}'
|
|
|
|
# List of price data. <Open_time>,<Open>,<High>,<Low>,<Close>,
|
|
# <Ignore><Close_time><Ignore>
|
|
# <Number_of_bisic_data>,<Ignore,Ignore,Ignore>
|
|
candlesticks = []
|
|
|
|
try:
|
|
# Populate <candlesticks> from <file_name> if it exists.
|
|
print(f'Attempting to open: {file_name}')
|
|
with open(file_name, 'r') as file:
|
|
reader = csv.reader(file, delimiter=',')
|
|
# Load the data here
|
|
for row in reader:
|
|
candlesticks.append(row)
|
|
print('File loaded')
|
|
# Open <file_name> for appending
|
|
file = open(file_name, 'a', newline='')
|
|
candlestick_writer = csv.writer(file, delimiter=',')
|
|
|
|
except IOError:
|
|
# If the file doesn't exist it must be created.
|
|
print(f'{file_name} not found: Creating the file')
|
|
# Open <file_name> for writing
|
|
file = open(file_name, 'w', newline='')
|
|
candlestick_writer = csv.writer(file, delimiter=',')
|
|
|
|
# If no candlesticks were loaded from file. Set a date to start loading from in the
|
|
# variable <last_candle_stamp> with a default value stored in <start_datetime>.
|
|
if not candlesticks:
|
|
last_candle_stamp = start_datetime.timestamp() * 1000
|
|
else:
|
|
# Set <last_candle_stamp> with the timestamp of the last candles on file
|
|
last_candle_stamp = candlesticks[-1][0]
|
|
# Request any missing candlestick data from the exchange
|
|
recent_candlesticks = self.client.get_historical_klines(symbol, interval, start_str=int(last_candle_stamp))
|
|
# Discard the first row of candlestick data as it will be a duplicate***DOUBLE CHECK THIS
|
|
recent_candlesticks.pop(0)
|
|
# Append the candlestick list and the file
|
|
for candlestick in recent_candlesticks:
|
|
candlesticks.append(candlestick)
|
|
candlestick_writer.writerow(candlestick)
|
|
# Close the file and return the entire candlestick history
|
|
file.close()
|
|
return candlesticks
|
|
|
|
def set_latest_vol(self):
|
|
# Extracts a list of volume values from all the loaded candlestick
|
|
# data and store it in a dictionary keyed to timestamp of measurement.
|
|
latest_vol = []
|
|
last_clp = 0
|
|
for data in self.candlesticks:
|
|
clp = int(float(data[4]))
|
|
if clp < last_clp:
|
|
color = 'rgba(255,82,82, 0.8)' # red
|
|
else:
|
|
color = 'rgba(0, 150, 136, 0.8)' # green
|
|
|
|
vol_data = {
|
|
"time": int(data[0]) / 1000,
|
|
"value": int(float(data[5])),
|
|
"color": color
|
|
}
|
|
last_clp = clp
|
|
latest_vol.append(vol_data)
|
|
self.latest_vol = latest_vol
|
|
return
|
|
|
|
def get_latest_vol(self, num_record=500):
|
|
# Returns the latest closing values
|
|
if self.latest_vol:
|
|
if len(self.latest_vol) < num_record:
|
|
print('Warning: get_latest_vol() - Requested too more records then available')
|
|
num_record = len(self.latest_vol)
|
|
return self.latest_vol[-num_record:]
|
|
else:
|
|
raise ValueError('Warning: get_latest_vol(): Values are not set')
|
|
|
|
def set_latest_high_values(self):
|
|
# Extracts a list of close values from all the loaded candlestick
|
|
# data and store it in a dictionary keyed to timestamp of measurement.
|
|
latest_high_values = []
|
|
for data in self.candlesticks:
|
|
high_data = {
|
|
"time": int(data[0]) / 1000,
|
|
"high": data[2]
|
|
}
|
|
latest_high_values.append(high_data)
|
|
self.latest_high_values = latest_high_values
|
|
return
|
|
|
|
def get_latest_high_values(self, num_record=500):
|
|
# Returns the latest closing values
|
|
if self.latest_high_values:
|
|
if len(self.latest_high_values) < num_record:
|
|
print('Warning: latest_high_values() - Requested too more records then available')
|
|
num_record = len(self.latest_high_values)
|
|
return self.latest_high_values[-num_record:]
|
|
else:
|
|
raise ValueError('Warning: latest_high_values(): Values are not set')
|
|
|
|
def set_latest_low_values(self):
|
|
# Extracts a list of close values from all the loaded candlestick
|
|
# data and store it in a dictionary keyed to timestamp of measurement.
|
|
latest_low_values = []
|
|
for data in self.candlesticks:
|
|
low_data = {
|
|
"time": int(data[0]) / 1000,
|
|
"low": data[3]
|
|
}
|
|
latest_low_values.append(low_data)
|
|
self.latest_low_values = latest_low_values
|
|
return
|
|
|
|
def get_latest_low_values(self, num_record=500):
|
|
# Returns the latest closing values
|
|
if self.latest_low_values:
|
|
if len(self.latest_low_values) < num_record:
|
|
print('Warning: latest_low_values() - Requested too more records then available')
|
|
num_record = len(self.latest_low_values)
|
|
return self.latest_low_values[-num_record:]
|
|
else:
|
|
raise ValueError('Warning: latest_low_values(): Values are not set')
|
|
|
|
def set_latest_close_values(self):
|
|
# Extracts a list of close values from all the loaded candlestick
|
|
# data and store it in a dictionary keyed to timestamp of measurement.
|
|
latest_close_values = []
|
|
for data in self.candlesticks:
|
|
close_data = {
|
|
"time": int(data[0]) / 1000,
|
|
"close": data[4]
|
|
}
|
|
latest_close_values.append(close_data)
|
|
self.latest_close_values = latest_close_values
|
|
return
|
|
|
|
def get_latest_close_values(self, num_record=500):
|
|
# Returns the latest closing values
|
|
if self.latest_close_values:
|
|
if len(self.latest_close_values) < num_record:
|
|
print('Warning: get_latest_close_values() - Requested too more records then available')
|
|
num_record = len(self.latest_close_values)
|
|
return self.latest_close_values[-num_record:]
|
|
else:
|
|
raise ValueError('Warning: get_latest_close_values(): Values are not set')
|
|
|
|
def set_candle_history(self, symbol=None, interval=None, max_data_loaded=None):
|
|
if not max_data_loaded:
|
|
max_data_loaded = self.max_data_loaded
|
|
if not symbol:
|
|
symbol = self.chart_configuration['trading_pair']
|
|
if not interval:
|
|
interval = self.chart_configuration['chart_interval']
|
|
if self.candlesticks:
|
|
print('set_candle_history(): Reloading candle data')
|
|
else:
|
|
print('set_candle_history(): Loading candle data')
|
|
# Load candles from file
|
|
cdata = self.load_candle_history(symbol, interval)
|
|
# Trim the beginning of the returned list to size of max_data_loaded of less
|
|
if len(cdata) < max_data_loaded:
|
|
max_data_loaded = len(cdata)
|
|
self.candlesticks = cdata[-max_data_loaded:]
|
|
# Set an instance dictionary of timestamped high, low, closing values
|
|
self.set_latest_high_values()
|
|
self.set_latest_low_values()
|
|
self.set_latest_close_values()
|
|
# Extract the volume data from self.candlesticks and store it in self.latest_vol
|
|
self.set_latest_vol()
|
|
|
|
# Set an instance reference of the last candle
|
|
self.last_candle = self.convert_candle(self.candlesticks[-1])
|
|
print('set_candle_history(): Candle data Loaded')
|
|
return
|
|
|
|
def convert_candle(self, candle):
|
|
candlestick = {
|
|
"time": int(candle[0]) / 1000,
|
|
"open": candle[1],
|
|
"high": candle[2],
|
|
"low": candle[3],
|
|
"close": candle[4]
|
|
}
|
|
return candlestick
|
|
|
|
def get_candle_history(self, symbol, interval, num_records):
|
|
|
|
if len(self.candlesticks) < num_records:
|
|
print('Warning: get_candle_history() Requested more records then available')
|
|
num_records = len(self.candlesticks)
|
|
|
|
# Drop everything but the requested number of records
|
|
candlesticks = self.candlesticks[-num_records:]
|
|
|
|
# Reformat relevant candlestick data into a list of python dictionary objects.
|
|
# Binance stores timestamps in milliseconds but lightweight charts doesn't,
|
|
# so it gets divided by 1000
|
|
processed_candlesticks = []
|
|
for data in candlesticks:
|
|
candlestick = {
|
|
"time": int(data[0]) / 1000,
|
|
"open": data[1],
|
|
"high": data[2],
|
|
"low": data[3],
|
|
"close": data[4]
|
|
}
|
|
processed_candlesticks.append(candlestick)
|
|
# Return a list of candlestick objects
|
|
return processed_candlesticks
|
|
|
|
# list enabled indicators
|
|
def get_enabled_indicators(self):
|
|
""" Loop through all indicators and make a list of indicators marked visible """
|
|
enabled_indicators = []
|
|
i_list = self.get_indicator_list()
|
|
for indctr in i_list:
|
|
if i_list[indctr]['visible']:
|
|
enabled_indicators.append(indctr)
|
|
return enabled_indicators
|
|
|
|
def set_indicator_defaults(self):
|
|
"""Set the default settings for each indicator"""
|
|
|
|
self.indicator_types = {'simple_indicators': ['RSI', 'SMA', 'EMA', 'LREG'],
|
|
'other': ['Volume', 'BOLBands', 'MACD', 'ATR']}
|
|
self.indicator_list = {
|
|
'SMA 21': {'type': 'SMA', 'period': 21, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
|
|
'value': 0},
|
|
'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
|
|
'value': 0},
|
|
'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
|
|
'value': 0},
|
|
'SMA 200': {'type': 'SMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
|
|
'value': 0},
|
|
'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
|
|
'value': 0},
|
|
'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
|
|
'value': 0},
|
|
'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2,
|
|
'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': 0, 'value1': '38691.58',
|
|
'value2': '38552.36',
|
|
'value3': '38413.14', 'visible': 'True'},
|
|
'vol': {'type': 'Volume', 'visible': True, 'value': 0}
|
|
}
|
|
|
|
return
|
|
|
|
def get_indicator_list(self):
|
|
# Returns a list of all the indicator object in this class instance
|
|
if not self.indicator_list:
|
|
raise ValueError('get_indicator_list(): No indicators in the list')
|
|
return self.indicator_list
|
|
|
|
def set_exchange_data(self):
|
|
# Pull all balances from client while discarding assets with zero balance
|
|
account = self.client.futures_coin_account_balance()
|
|
self.exchange_data['balances'] = [asset for asset in account if float(asset['balance']) > 0]
|
|
|
|
# Pull all available symbols from client
|
|
exchange_info = self.client.get_exchange_info()
|
|
self.exchange_data['symbols'] = exchange_info['symbols']
|
|
|
|
# Available intervals
|
|
self.exchange_data['intervals'] = (
|
|
KLINE_INTERVAL_1MINUTE, KLINE_INTERVAL_3MINUTE,
|
|
KLINE_INTERVAL_5MINUTE, KLINE_INTERVAL_15MINUTE,
|
|
KLINE_INTERVAL_30MINUTE, KLINE_INTERVAL_1HOUR,
|
|
KLINE_INTERVAL_2HOUR, KLINE_INTERVAL_4HOUR,
|
|
KLINE_INTERVAL_6HOUR, KLINE_INTERVAL_8HOUR,
|
|
KLINE_INTERVAL_12HOUR, KLINE_INTERVAL_1DAY,
|
|
KLINE_INTERVAL_3DAY, KLINE_INTERVAL_1WEEK,
|
|
KLINE_INTERVAL_1MONTH
|
|
)
|
|
|
|
def get_rendered_data(self):
|
|
"""
|
|
Data to be rendered in the HTML
|
|
"""
|
|
rd = {}
|
|
rd['title'] = self.application_title # Title of the page
|
|
rd['my_balances'] = self.exchange_data['balances'] # Balances on the exchange
|
|
rd['symbols'] = self.exchange_data['symbols'] # Symbols information from the exchange
|
|
rd['intervals'] = self.exchange_data['intervals'] # Time candle time intervals available to stream
|
|
rd['chart_interval'] = self.chart_configuration['chart_interval'] # The charts current interval setting
|
|
rd['indicator_types'] = self.indicator_types # All the types indicators Available
|
|
rd['indicator_list'] = self.get_indicator_list() # indicators available
|
|
rd['enabled_indicators'] = self.get_enabled_indicators() # list of indicators that are enabled
|
|
rd['ma_vals'] = self.bb_ma_val # A list of acceptable values to use with bolenger band creation
|
|
return rd
|
|
|
|
def get_indicator_data(self, symbol=None, interval=None, num_results=100):
|
|
# Loop through all the indicators. If enabled, run the appropriate
|
|
# update function. Return all the results as a dictionary object.
|
|
|
|
if not interval:
|
|
interval = self.chart_configuration['chart_interval']
|
|
if not symbol:
|
|
symbol = self.chart_configuration['trading_pair']
|
|
|
|
# Get a list of indicator objects and a list of enabled indicators names.
|
|
i_list = self.get_indicator_list()
|
|
enabled_i = self.get_enabled_indicators()
|
|
result = {}
|
|
# Loop through all indicator objects in i_list
|
|
for each_i in i_list:
|
|
# If the indicator's not enabled skip to next each_i
|
|
if each_i not in enabled_i:
|
|
continue
|
|
i_type = i_list[each_i]['type']
|
|
# If it is a simple indicator.
|
|
if i_type in self.indicator_types['simple_indicators']:
|
|
result[each_i] = self.calculate_simple_indicator(i_type=i_type,
|
|
period=i_list[each_i]['period'])
|
|
if i_type in self.indicator_types['other']:
|
|
if i_type == 'BOLBands':
|
|
result[each_i] = self.calculate_bolbands(i_type=i_type,
|
|
period=i_list[each_i]['period'],
|
|
devup=i_list[each_i]['devup'],
|
|
devdn=i_list[each_i]['devdn'],
|
|
ma=i_list[each_i]['ma'])
|
|
if i_type == 'MACD':
|
|
result[each_i] = self.calculate_macd(i_type=i_type,
|
|
fast_p=i_list[each_i]['fast_p'],
|
|
slow_p=i_list[each_i]['slow_p'],
|
|
signal_p=i_list[each_i]['signal_p'])
|
|
if i_type == 'Volume':
|
|
result[each_i] = self.get_volume(i_type=i_type)
|
|
|
|
if i_type == 'ATR':
|
|
result[each_i] = self.calculate_atr(i_type=i_type,
|
|
period=i_list[each_i]['period'])
|
|
|
|
return result
|
|
|
|
def get_volume(self, i_type, num_results=800):
|
|
r_data = self.get_latest_vol()
|
|
r_data = r_data[-num_results:]
|
|
return {"type": i_type, "data": r_data}
|
|
|
|
def calculate_macd(self, i_type, fast_p=12, slow_p=26, signal_p=9, num_results=800):
|
|
# These indicators do computations over a period number of price data points.
|
|
# So we need at least that plus what ever amount of results needed.
|
|
# It seems it needs num_of_nans = (slow_p) - 2) + signal_p
|
|
|
|
# TODO: slow_p or fast_p which ever is greater should be used in the calc below.
|
|
# TODO but i am investigating this.
|
|
if fast_p > slow_p:
|
|
raise ValueError('Error I think: TODO: calculate_macd()')
|
|
num_cv = (slow_p - 2) + signal_p + num_results
|
|
|
|
closing_data = self.get_latest_close_values(num_cv)
|
|
if len(closing_data) < num_cv:
|
|
print(f'Couldn\'t calculate {i_type} for time period of {period}')
|
|
print('Not enough data availiable')
|
|
return
|
|
# Initialize two arrays to hold a list of closing values and
|
|
# a list of timestamps associated with these values
|
|
closes = []
|
|
ts = []
|
|
# Isolate all the closing values and timestamps from
|
|
# the dictionary object
|
|
for each in closing_data:
|
|
closes.append(each['close'])
|
|
ts.append(each['time'])
|
|
# Convert the list of closes to a numpy array
|
|
np_real_data = np.array(closes, dtype=float)
|
|
# Pass the closing values and the period parameter to talib
|
|
macd, signal, hist = talib.MACD(np_real_data, fast_p, slow_p, signal_p)
|
|
|
|
# Combine the new data with the timestamps
|
|
# Warning: The first (<period> -1) of values are <NAN>.
|
|
# But they should get trimmed off
|
|
macd = macd[-num_results:]
|
|
if len(macd) == 1:
|
|
print('looks like after slicing')
|
|
print(macd)
|
|
signal = signal[-num_results:]
|
|
hist = hist[-num_results:]
|
|
ts = ts[-num_results:]
|
|
r_macd = []
|
|
r_signal = []
|
|
r_hist = []
|
|
for each in range(len(macd)):
|
|
# filter out nan values
|
|
if np.isnan(macd[each]):
|
|
continue
|
|
r_macd.append({'time': ts[each], 'value': macd[each]})
|
|
r_signal.append({'time': ts[each], 'value': signal[each]})
|
|
r_hist.append({'time': ts[each], 'value': hist[each]})
|
|
r_data = [r_macd, r_signal, r_hist]
|
|
return {"type": i_type, "data": r_data}
|
|
|
|
def calculate_atr(self, i_type, period, num_results=800):
|
|
# These indicators do computations over period number of price data points.
|
|
# So we need at least that plus what ever amount of results needed.
|
|
num_cv = period + num_results
|
|
|
|
high_data = self.get_latest_high_values(num_cv)
|
|
low_data = self.get_latest_low_values(num_cv)
|
|
close_data = self.get_latest_close_values(num_cv)
|
|
if len(close_data) < num_cv:
|
|
print(f'Couldn\'t calculate {i_type} for time period of {period}')
|
|
print('Not enough data availiable')
|
|
return
|
|
# Initialize 4 arrays to hold a list of h/l/c values and
|
|
# a list of timestamps associated with these values
|
|
highs = []
|
|
lows = []
|
|
closes = []
|
|
ts = []
|
|
# Isolate all the values and timestamps from
|
|
# the dictionary objects
|
|
|
|
for each in high_data:
|
|
highs.append(each['high'])
|
|
for each in low_data:
|
|
lows.append(each['low'])
|
|
for each in close_data:
|
|
closes.append(each['close'])
|
|
ts.append(each['time'])
|
|
# Convert the lists to a numpy array
|
|
np_highs = np.array(highs, dtype=float)
|
|
np_lows = np.array(lows, dtype=float)
|
|
np_closes = np.array(closes, dtype=float)
|
|
# Pass the closing values and the period parameter to talib
|
|
atr = talib.ATR(high=np_highs,
|
|
low=np_lows,
|
|
close=np_closes,
|
|
timeperiod=period)
|
|
# Combine the new data with the timestamps
|
|
# Warning: The first (<period> -1) of values are <NAN>.
|
|
# But they should get trimmed off
|
|
atr = atr[-num_results:]
|
|
ts = ts[-num_results:]
|
|
r_data = []
|
|
for each in range(len(atr)):
|
|
# filter out nan values
|
|
if np.isnan(atr[each]):
|
|
continue
|
|
r_data.append({'time': ts[each], 'value': atr[each]})
|
|
return {"type": i_type, "data": r_data}
|
|
|
|
def calculate_bolbands(self, i_type, period, devup=2, devdn=2, ma=0, num_results=800):
|
|
# These indicators do computations over period number of price data points.
|
|
# So we need at least that plus what ever amount of results needed.
|
|
# Acceptable values for ma in the talib.BBANDS
|
|
# {'SMA':0,'EMA':1, 'WMA' : 2, 'DEMA' : 3, 'TEMA' : 4, 'TRIMA' : 5, 'KAMA' : 6, 'MAMA' : 7, 'T3' : 8}
|
|
num_cv = period + num_results
|
|
closing_data = self.get_latest_close_values(num_cv)
|
|
if len(closing_data) < num_cv:
|
|
print(f'Couldn\'t calculate {i_type} for time period of {period}')
|
|
print('Not enough data availiable')
|
|
return
|
|
# Initialize two arrays to hold a list of closing values and
|
|
# a list of timestamps associated with these values
|
|
closes = []
|
|
ts = []
|
|
# Isolate all the closing values and timestamps from
|
|
# the dictionary object
|
|
for each in closing_data:
|
|
closes.append(each['close'])
|
|
ts.append(each['time'])
|
|
# Convert the list of closes to a numpy array
|
|
np_real_data = np.array(closes, dtype=float)
|
|
# Pass the closing values and the period parameter to talib
|
|
upper, middle, lower = talib.BBANDS(np_real_data,
|
|
timeperiod=period,
|
|
# number of non-biased standard deviations from the mean
|
|
nbdevup=devup,
|
|
nbdevdn=devdn,
|
|
# Moving average type: simple moving average here
|
|
matype=ma)
|
|
|
|
# Combine the new data with the timestamps
|
|
# Warning: The first (<period> -1) of values are <NAN>.
|
|
# But they should get trimmed off
|
|
i_values_u = upper[-num_results:]
|
|
i_values_m = middle[-num_results:]
|
|
i_values_l = lower[-num_results:]
|
|
ts = ts[-num_results:]
|
|
r_data_u = []
|
|
r_data_m = []
|
|
r_data_l = []
|
|
for each in range(len(i_values_u)):
|
|
# filter out nan values
|
|
if np.isnan(i_values_u[each]):
|
|
continue
|
|
r_data_u.append({'time': ts[each], 'value': i_values_u[each]})
|
|
r_data_m.append({'time': ts[each], 'value': i_values_m[each]})
|
|
r_data_l.append({'time': ts[each], 'value': i_values_l[each]})
|
|
r_data = [r_data_u, r_data_m, r_data_l]
|
|
return {"type": i_type, "data": r_data}
|
|
|
|
def calculate_simple_indicator(self, i_type, period, num_results=800):
|
|
# Valid types of indicators for this function
|
|
if i_type not in self.indicator_types['simple_indicators']:
|
|
raise ValueError(f'calculate_simple_indicator(): Unknown type: {i_type}')
|
|
|
|
# These indicators do computations over period number of price data points.
|
|
# So we need at least that plus what ever amount of results needed.
|
|
num_cv = period + num_results
|
|
closing_data = self.get_latest_close_values(num_cv)
|
|
if len(closing_data) < num_cv:
|
|
print(f'Couldn\'t calculate {i_type} for time period of {period}')
|
|
print('Not enough data availiable')
|
|
return
|
|
# Initialize two arrays to hold a list of closing values and
|
|
# a list of timestamps associated with these values
|
|
closes = []
|
|
ts = []
|
|
# Isolate all the closing values and timestamps from
|
|
# the dictionary object
|
|
for each in closing_data:
|
|
closes.append(each['close'])
|
|
ts.append(each['time'])
|
|
# Convert the list of closes to a numpy array
|
|
np_real_data = np.array(closes, dtype=float)
|
|
# Pass the closing values and the period parameter to talib
|
|
if i_type == 'SMA':
|
|
i_values = talib.SMA(np_real_data, period)
|
|
if i_type == 'RSI':
|
|
i_values = talib.RSI(np_real_data, period)
|
|
if i_type == 'EMA':
|
|
i_values = talib.EMA(np_real_data, period)
|
|
if i_type == 'LREG':
|
|
i_values = talib.LINEARREG(np_real_data, period)
|
|
|
|
# Combine the new data with the timestamps
|
|
# Warning: The first <period> of rsi values are <NAN>.
|
|
# But they should get trimmed off todo get rid of try except *just debuging info
|
|
try:
|
|
i_values = i_values[-num_results:]
|
|
except:
|
|
raise ValueError(f'error: {i_type} {i_values}')
|
|
ts = ts[-num_results:]
|
|
r_data = []
|
|
for each in range(len(i_values)):
|
|
r_data.append({'time': ts[each], 'value': i_values[each]})
|
|
return {"type": i_type, "data": r_data}
|
|
|
|
def create_indicator(self, name, type, properties):
|
|
# Indicator type checking before adding to a dictionary of properties
|
|
properties['type'] = type
|
|
# Force color and period properties for simple indicators
|
|
if type in self.indicator_types['simple_indicators']:
|
|
if 'color' not in properties:
|
|
properties['color'] = f"#{random.randrange(0x1000000):06x}"
|
|
if 'period' not in properties:
|
|
properties['period'] = 20
|
|
if type in self.indicator_types['other']:
|
|
ul_col = f"#{random.randrange(0x1000000):06x}"
|
|
if type == 'BOLBands':
|
|
if 'period' not in properties:
|
|
properties['period'] = 50
|
|
if 'color_1' not in properties:
|
|
properties['color_1'] = ul_col
|
|
if 'color_2' not in properties:
|
|
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
|
|
if 'color_3' not in properties:
|
|
properties['color_3'] = ul_col
|
|
if 'value1' not in properties:
|
|
properties['value1'] = 0
|
|
if 'value2' not in properties:
|
|
properties['value2'] = 0
|
|
if 'value3' not in properties:
|
|
properties['value3'] = 0
|
|
if 'devup' not in properties:
|
|
properties['devup'] = 2
|
|
if 'devdn' not in properties:
|
|
properties['devdn'] = 2
|
|
if 'ma' not in properties:
|
|
properties['ma'] = 1
|
|
if type == 'MACD':
|
|
if 'fast_p' not in properties:
|
|
properties['fast_p'] = 12
|
|
if 'slow_p' not in properties:
|
|
properties['slow_p'] = 26
|
|
if 'signal_p' not in properties:
|
|
properties['signal_p'] = 9
|
|
if 'macd' not in properties:
|
|
properties['macd'] = 0
|
|
if 'signal' not in properties:
|
|
properties['signal'] = 0
|
|
if 'hist' not in properties:
|
|
properties['hist'] = 0
|
|
if 'color_1' not in properties:
|
|
properties['color_1'] = f"#{random.randrange(0x1000000):06x}"
|
|
if 'color_2' not in properties:
|
|
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
|
|
if type == 'ATR':
|
|
if 'period' not in properties:
|
|
properties['period'] = 50
|
|
if 'color' not in properties:
|
|
properties['color'] = f"#{random.randrange(0x1000000):06x}"
|
|
|
|
# Force value and visibility for all indicators
|
|
if 'value' not in properties:
|
|
properties['value'] = 0
|
|
if 'visible' not in properties:
|
|
properties['visible'] = True
|
|
# Add the dictionary of properties and values to an instance list
|
|
self.indicator_list[name] = properties
|
|
return
|
|
|
|
def received_cdata(self, cdata):
|
|
# If this is the first candle received,
|
|
# then just set last_candle and return.
|
|
if not self.last_candle:
|
|
self.last_candle = cdata
|
|
return
|
|
# If this candle is the same as last candle return nothing to do.
|
|
if cdata['time']:
|
|
if cdata['time'] == self.last_candle['time']:
|
|
return
|
|
|
|
# **** New candle is received ***
|
|
# Update the instance data records.
|
|
self.last_candle = cdata
|
|
self.latest_close_values.append({'time': cdata['time'], 'close': cdata['close']})
|
|
self.latest_high_values.append({'time': cdata['time'], 'high': cdata['high']})
|
|
self.latest_low_values.append({'time': cdata['time'], 'low': cdata['low']})
|
|
self.latest_vol.append({'time': cdata['time'], 'value': cdata['vol']})
|
|
# Update indicators
|
|
updates = self.update_indicators()
|
|
return updates
|
|
|
|
def update_indicators(self):
|
|
enabled_indcrs = self.get_enabled_indicators()
|
|
indcrs_list = self.get_indicator_list()
|
|
# Updated data is collected in this dictionary object
|
|
updates = {}
|
|
# Loop through all enabled indicators
|
|
for indcr in enabled_indcrs:
|
|
# Get the type of the indicator being updated
|
|
i_type = indcrs_list[indcr]['type']
|
|
# Update the indicator with a function appropriate for its kind
|
|
# TODO - Check EMA results i see a bit of a sharp turn in the ema line on
|
|
# the interface side when reloading the page. It smooths out after a full reload.
|
|
if i_type in self.indicator_types['simple_indicators']:
|
|
updates[indcr] = self.calculate_simple_indicator(i_type=i_type,
|
|
period=indcrs_list[indcr]['period'],
|
|
num_results=1)
|
|
if i_type == 'BOLBands':
|
|
updates[indcr] = self.calculate_bolbands(i_type=i_type,
|
|
period=indcrs_list[indcr]['period'],
|
|
devup=indcrs_list[indcr]['devup'],
|
|
devdn=indcrs_list[indcr]['devdn'],
|
|
ma=indcrs_list[indcr]['ma'],
|
|
num_results=1)
|
|
if i_type == 'MACD':
|
|
updates[indcr] = self.calculate_macd(i_type=i_type,
|
|
fast_p=indcrs_list[indcr]['fast_p'],
|
|
slow_p=indcrs_list[indcr]['slow_p'],
|
|
signal_p=indcrs_list[indcr]['signal_p'],
|
|
num_results=1)
|
|
|
|
if i_type == 'ATR':
|
|
updates[indcr] = self.calculate_atr(i_type=i_type,
|
|
period=indcrs_list[indcr]['period'],
|
|
num_results=1)
|
|
|
|
if i_type == 'Volume':
|
|
updates[indcr] = self.get_volume(i_type=i_type,
|
|
num_results=1)
|
|
return updates
|
|
def received_new_signal(self, data):
|
|
# Check the data.
|
|
if 'sigName' not in data:
|
|
return 'data.py:received_new_signal() - The new signal has no name. '
|
|
Signal
|
|
|
|
print(data)
|
|
|
|
app_data = BrighterData()
|