brighter-trading/indicators.py

405 lines
19 KiB
Python

import random
import numpy as np
import talib
class Indicators:
def __init__(self, candles, config):
# Object containing Price and candle data.
self.candles = candles
# List of all available indicator types
# todo: get rid of this use inheritance in classes instead.
self.indicator_types = {'simple_indicators': ['RSI', 'SMA', 'EMA', 'LREG'],
'other': ['Volume', 'BOLBands', 'MACD', 'ATR']}
# List of all available indicators
self.indicator_list = config.indicator_list
# A list of values to use with bolenger bands
self.bb_ma_val = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4, 'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8}
@staticmethod
def get_indicator_defaults():
"""Set the default settings for each indicator"""
indicator_list = {
'SMA 21': {'type': 'SMA', 'period': 21, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'SMA 200': {'type': 'SMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2,
'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': 0, 'value1': '38691.58',
'value2': '38552.36',
'value3': '38413.14', 'visible': 'True'},
'vol': {'type': 'Volume', 'visible': True, 'value': 0}
}
return indicator_list
def get_indicator_list(self):
# Returns a list of all the indicator object in this class instance
if not self.indicator_list:
raise ValueError('get_indicator_list(): No indicators in the list')
return self.indicator_list
def get_enabled_indicators(self):
""" Loop through all indicators and return a list of indicators marked visible """
enabled_indicators = []
i_list = self.get_indicator_list()
for indctr in i_list:
if i_list[indctr]['visible']:
enabled_indicators.append(indctr)
return enabled_indicators
def get_indicator_data(self, symbol=None, interval=None, num_results=100):
# Loop through all the indicators. If enabled, run the appropriate
# update function. Return all the results as a dictionary object.
# Get a list of indicator objects and a list of enabled indicators names.
i_list = self.get_indicator_list()
enabled_i = self.get_enabled_indicators()
result = {}
# Loop through all indicator objects in i_list
for each_i in i_list:
# If the indicator's not enabled skip to next each_i
if each_i not in enabled_i:
continue
i_type = i_list[each_i]['type']
# If it is a simple indicator.
if i_type in self.indicator_types['simple_indicators']:
result[each_i] = self.calculate_simple_indicator(i_type=i_type,
period=i_list[each_i]['period'])
if i_type in self.indicator_types['other']:
if i_type == 'BOLBands':
result[each_i] = self.calculate_bolbands(i_type=i_type,
period=i_list[each_i]['period'],
devup=i_list[each_i]['devup'],
devdn=i_list[each_i]['devdn'],
ma=i_list[each_i]['ma'])
if i_type == 'MACD':
result[each_i] = self.calculate_macd(i_type=i_type,
fast_p=i_list[each_i]['fast_p'],
slow_p=i_list[each_i]['slow_p'],
signal_p=i_list[each_i]['signal_p'])
if i_type == 'Volume':
result[each_i] = self.candles.get_volume(i_type=i_type)
if i_type == 'ATR':
result[each_i] = self.calculate_atr(i_type=i_type,
period=i_list[each_i]['period'])
return result
def calculate_macd(self, i_type, fast_p=12, slow_p=26, signal_p=9, num_results=800):
# These indicators do computations over a period number of price data points.
# So we need at least that plus what ever amount of results needed.
# It seems it needs num_of_nans = (slow_p) - 2) + signal_p
# TODO: slow_p or fast_p which ever is greater should be used in the calc below.
# TODO but i am investigating this.
if fast_p > slow_p:
raise ValueError('Error I think: TODO: calculate_macd()')
num_cv = (slow_p - 2) + signal_p + num_results
closing_data = self.candles.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {slow_p}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
macd, signal, hist = talib.MACD(np_real_data, fast_p, slow_p, signal_p)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
macd = macd[-num_results:]
if len(macd) == 1:
print('looks like after slicing')
print(macd)
signal = signal[-num_results:]
hist = hist[-num_results:]
ts = ts[-num_results:]
r_macd = []
r_signal = []
r_hist = []
for each in range(len(macd)):
# filter out nan values
if np.isnan(macd[each]):
continue
r_macd.append({'time': ts[each], 'value': macd[each]})
r_signal.append({'time': ts[each], 'value': signal[each]})
r_hist.append({'time': ts[each], 'value': hist[each]})
r_data = [r_macd, r_signal, r_hist]
return {"type": i_type, "data": r_data}
def calculate_atr(self, i_type, period, num_results=800):
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
num_cv = period + num_results
high_data = self.candles.get_latest_high_values(num_cv)
low_data = self.candles.get_latest_low_values(num_cv)
close_data = self.candles.get_latest_close_values(num_cv)
if len(close_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize 4 arrays to hold a list of h/l/c values and
# a list of timestamps associated with these values
highs = []
lows = []
closes = []
ts = []
# Isolate all the values and timestamps from
# the dictionary objects
for each in high_data:
highs.append(each['high'])
for each in low_data:
lows.append(each['low'])
for each in close_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the lists to a numpy array
np_highs = np.array(highs, dtype=float)
np_lows = np.array(lows, dtype=float)
np_closes = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
atr = talib.ATR(high=np_highs,
low=np_lows,
close=np_closes,
timeperiod=period)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
atr = atr[-num_results:]
ts = ts[-num_results:]
r_data = []
for each in range(len(atr)):
# filter out nan values
if np.isnan(atr[each]):
continue
r_data.append({'time': ts[each], 'value': atr[each]})
return {"type": i_type, "data": r_data}
def calculate_bolbands(self, i_type, period, devup=2, devdn=2, ma=0, num_results=800):
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
# Acceptable values for ma in the talib.BBANDS
# {'SMA':0,'EMA':1, 'WMA' : 2, 'DEMA' : 3, 'TEMA' : 4, 'TRIMA' : 5, 'KAMA' : 6, 'MAMA' : 7, 'T3' : 8}
num_cv = period + num_results
closing_data = self.candles.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {i_type} for time period of {period}')
print('Not enough data availiable')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
upper, middle, lower = talib.BBANDS(np_real_data,
timeperiod=period,
# number of non-biased standard deviations from the mean
nbdevup=devup,
nbdevdn=devdn,
# Moving average type: simple moving average here
matype=ma)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
i_values_u = upper[-num_results:]
i_values_m = middle[-num_results:]
i_values_l = lower[-num_results:]
ts = ts[-num_results:]
r_data_u = []
r_data_m = []
r_data_l = []
for each in range(len(i_values_u)):
# filter out nan values
if np.isnan(i_values_u[each]):
continue
r_data_u.append({'time': ts[each], 'value': i_values_u[each]})
r_data_m.append({'time': ts[each], 'value': i_values_m[each]})
r_data_l.append({'time': ts[each], 'value': i_values_l[each]})
r_data = [r_data_u, r_data_m, r_data_l]
return {"type": i_type, "data": r_data}
def calculate_simple_indicator(self, i_type, period, num_results=800):
# Valid types of indicators for this function
if i_type not in self.indicator_types['simple_indicators']:
raise ValueError(f'calculate_simple_indicator(): Unknown type: {i_type}')
# These indicators do computations over period number of price data points.
# So we need at least that plus what ever amount of results needed.
num_cv = period + num_results
closing_data = self.candles.get_latest_close_values(num_cv)
if len(closing_data) < num_cv:
print(f'Could not calculate {i_type} for time period of {period}')
print('Not enough data available')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
i_values = None
if i_type == 'SMA':
i_values = talib.SMA(np_real_data, period)
if i_type == 'RSI':
i_values = talib.RSI(np_real_data, period)
if i_type == 'EMA':
i_values = talib.EMA(np_real_data, period)
if i_type == 'LREG':
i_values = talib.LINEARREG(np_real_data, period)
# Combine the new data with the timestamps
# Warning: The first <period> of rsi values are <NAN>.
# But they should get trimmed off todo get rid of try except *just debuging info
try:
i_values = i_values[-num_results:]
except Exception:
raise ValueError(f'error: {i_type} {i_values}')
ts = ts[-num_results:]
r_data = []
for each in range(len(i_values)):
r_data.append({'time': ts[each], 'value': i_values[each]})
return {"type": i_type, "data": r_data}
def create_indicator(self, name, itype, properties):
# Indicator type checking before adding to a dictionary of properties
properties['type'] = itype
# Force color and period properties for simple indicators
if itype in self.indicator_types['simple_indicators']:
if 'color' not in properties:
properties['color'] = f"#{random.randrange(0x1000000):06x}"
if 'period' not in properties:
properties['period'] = 20
if itype in self.indicator_types['other']:
ul_col = f"#{random.randrange(0x1000000):06x}"
if itype == 'BOLBands':
if 'period' not in properties:
properties['period'] = 50
if 'color_1' not in properties:
properties['color_1'] = ul_col
if 'color_2' not in properties:
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if 'color_3' not in properties:
properties['color_3'] = ul_col
if 'value1' not in properties:
properties['value1'] = 0
if 'value2' not in properties:
properties['value2'] = 0
if 'value3' not in properties:
properties['value3'] = 0
if 'devup' not in properties:
properties['devup'] = 2
if 'devdn' not in properties:
properties['devdn'] = 2
if 'ma' not in properties:
properties['ma'] = 1
if itype == 'MACD':
if 'fast_p' not in properties:
properties['fast_p'] = 12
if 'slow_p' not in properties:
properties['slow_p'] = 26
if 'signal_p' not in properties:
properties['signal_p'] = 9
if 'macd' not in properties:
properties['macd'] = 0
if 'signal' not in properties:
properties['signal'] = 0
if 'hist' not in properties:
properties['hist'] = 0
if 'color_1' not in properties:
properties['color_1'] = f"#{random.randrange(0x1000000):06x}"
if 'color_2' not in properties:
properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if itype == 'ATR':
if 'period' not in properties:
properties['period'] = 50
if 'color' not in properties:
properties['color'] = f"#{random.randrange(0x1000000):06x}"
# Force value and visibility for all indicators
if 'value' not in properties:
properties['value'] = 0
if 'visible' not in properties:
properties['visible'] = True
# Add the dictionary of properties and values to an instance list
self.indicator_list[name] = properties
return
def update_indicators(self):
enabled_indcrs = self.get_enabled_indicators()
indcrs_list = self.get_indicator_list()
# Updated data is collected in this dictionary object
updates = {}
# Loop through all enabled indicators
for indcr in enabled_indcrs:
# Get the type of the indicator being updated
i_type = indcrs_list[indcr]['type']
# Update the indicator with a function appropriate for its kind
# TODO - Check EMA results i see a bit of a sharp turn in the ema line on
# the interface side when reloading the page. It smooths out after a full reload.
if i_type in self.indicator_types['simple_indicators']:
updates[indcr] = self.calculate_simple_indicator(i_type=i_type,
period=indcrs_list[indcr]['period'],
num_results=1)
if i_type == 'BOLBands':
updates[indcr] = self.calculate_bolbands(i_type=i_type,
period=indcrs_list[indcr]['period'],
devup=indcrs_list[indcr]['devup'],
devdn=indcrs_list[indcr]['devdn'],
ma=indcrs_list[indcr]['ma'],
num_results=1)
if i_type == 'MACD':
updates[indcr] = self.calculate_macd(i_type=i_type,
fast_p=indcrs_list[indcr]['fast_p'],
slow_p=indcrs_list[indcr]['slow_p'],
signal_p=indcrs_list[indcr]['signal_p'],
num_results=1)
if i_type == 'ATR':
updates[indcr] = self.calculate_atr(i_type=i_type,
period=indcrs_list[indcr]['period'],
num_results=1)
if i_type == 'Volume':
updates[indcr] = self.candles.get_volume(i_type=i_type,
num_results=1)
return updates