brighter-trading/indicators.py

454 lines
19 KiB
Python

import random
import numpy as np
import talib
# Create a List of all available indicator types
indicator_types = []
class Indicator:
def __init__(self, name, indicator_type, properties):
# Initialise all indicators with some default properties.
self.name = name
self.properties = properties
self.properties['type'] = indicator_type
if 'value' not in properties:
self.properties['value'] = 0
if 'visible' not in properties:
self.properties['visible'] = True
def get_records(self, value_name, candles, num_results=1):
# These indicators do computations over a period of price data points.
# We need to give it at least that amount of data plus the results requested.
num_records = self.properties['period'] + num_results
data = candles.get_latest_values(value_name, num_records)
if len(data) < num_records:
print(f'Could not calculate {self.properties["type"]} for time period of {self.properties["period"]}')
print('Not enough data available.')
return
return data
@staticmethod
def isolate_values(value_name, data):
# Isolate the values and timestamps from the dictionary object.
values = []
timestamps = []
for each in data:
values.append(each[value_name])
timestamps.append(each['time'])
return values, timestamps
def calculate(self, candles, num_results=1):
# Get a list of closing values and timestamps associated with these values.
closes, ts = self.isolate_values('close', self.get_records('close', candles, num_results))
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
i_values = self.process(np_real_data, self.properties['period'])
# The first <period> of values returned from talib are all <NAN>.
# They should get trimmed off.
i_values = i_values[-num_results:]
ts = ts[-num_results:]
# Set the current indicator value to the last value calculated.
self.properties['value'] = round(float(i_values[-1]), 2)
# Combine the new data with the timestamps
r_data = []
for each in range(len(i_values)):
r_data.append({'time': ts[each], 'value': i_values[each]})
# Return the data prefixed with the type of indicator.
return {"type": self.properties['type'], "data": r_data}
def process(self, data, period):
# Abstract function must be overloaded with the appropriate talib call.
raise ValueError(f'Indicators: No talib call implemented : {self.name}')
class Volume(Indicator):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
# Override the calculate function because volume is
# not calculated just fetched from candle data.
def calculate(self, candles, num_results=1):
# Request an array of the last volume values.
r_data = candles.get_latest_vol(num_results)
# Set the current volume to the last value returned.
self.properties['value'] = float(r_data[-1]['value'])
# Return the data prefixed with the type of indicator.
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('Volume')
class SMA(Indicator):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
if 'color' not in properties:
self.properties['color'] = f"#{random.randrange(0x1000000):06x}"
if 'period' not in properties:
self.properties['period'] = 20
def process(self, data, period):
return talib.SMA(data, period)
indicator_types.append('SMA')
class EMA(SMA):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
def process(self, data, period):
return talib.EMA(data, period)
indicator_types.append('EMA')
class RSI(SMA):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
def process(self, data, period):
return talib.RSI(data, period)
indicator_types.append('RSI')
class LREG(SMA):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
def process(self, data, period):
return talib.LINEARREG(data, period)
indicator_types.append('LREG')
class ATR(SMA):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
def calculate(self, candles, num_results=1):
# Initialize 4 arrays to hold a list of h/l/c values and
# Get a list of values and timestamps associated with them.
highs, ts = self.isolate_values('high', self.get_records('high', candles, num_results))
lows, ts = self.isolate_values('low', self.get_records('low', candles, num_results))
closes, ts = self.isolate_values('close', self.get_records('close', candles, num_results))
# Convert the lists to a numpy array
np_highs = np.array(highs, dtype=float)
np_lows = np.array(lows, dtype=float)
np_closes = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
atr = talib.ATR(high=np_highs,
low=np_lows,
close=np_closes,
timeperiod=self.properties.period)
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
atr = atr[-num_results:]
ts = ts[-num_results:]
# Set the current indicator value to the last value calculated.
self.properties['value'] = round(float(atr[-1]), 2)
r_data = []
for each in range(len(atr)):
# filter out nan values
if np.isnan(atr[each]):
continue
r_data.append({'time': ts[each], 'value': atr[each]})
return {"type": self.properties.type, "data": r_data}
indicator_types.append('ATR')
class BolBands(Indicator):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
ul_col = f"#{random.randrange(0x1000000):06x}"
if 'period' not in properties:
self.properties['period'] = 50
if 'color_1' not in properties:
self.properties['color_1'] = ul_col
if 'color_2' not in properties:
self.properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
if 'color_3' not in properties:
self.properties['color_3'] = ul_col
if 'value' not in properties:
self.properties['value'] = 0
if 'value2' not in properties:
self.properties['value2'] = 0
if 'value3' not in properties:
self.properties['value3'] = 0
if 'devup' not in properties:
self.properties['devup'] = 2
if 'devdn' not in properties:
self.properties['devdn'] = 2
if 'ma' not in properties:
self.properties['ma'] = 1
def calculate(self, candles, num_results=1):
# Get a list of closing values and timestamps associated with these values.
closes, ts = self.isolate_values('close', self.get_records('close', candles, num_results))
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
upper, middle, lower = talib.BBANDS(np_real_data,
timeperiod=self.properties['period'],
# number of non-biased standard deviations from the mean
nbdevup=self.properties['devup'],
nbdevdn=self.properties['devdn'],
# Moving average type: simple moving average here
matype=self.properties['ma'])
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
i_values_u = upper[-num_results:]
i_values_m = middle[-num_results:]
i_values_l = lower[-num_results:]
ts = ts[-num_results:]
r_data_u = []
r_data_m = []
r_data_l = []
# Set the current indicator values to the last value calculated.
self.properties['value'] = round(float(i_values_u[-1]),2)
self.properties['value2'] = round(float(i_values_m[-1]),2)
self.properties['value3'] = round(float(i_values_l[-1]),2)
for each in range(len(i_values_u)):
# filter out nan values
if np.isnan(i_values_u[each]):
continue
r_data_u.append({'time': ts[each], 'value': i_values_u[each]})
r_data_m.append({'time': ts[each], 'value': i_values_m[each]})
r_data_l.append({'time': ts[each], 'value': i_values_l[each]})
r_data = [r_data_u, r_data_m, r_data_l]
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('BOLBands')
class MACD(Indicator):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
if 'fast_p' not in properties:
self.properties['fast_p'] = 12
if 'slow_p' not in properties:
self.properties['slow_p'] = 26
if 'signal_p' not in properties:
self.properties['signal_p'] = 9
if 'macd' not in properties:
self.properties['macd'] = 0
if 'signal' not in properties:
self.properties['signal'] = 0
if 'hist' not in properties:
self.properties['hist'] = 0
if 'color_1' not in properties:
self.properties['color_1'] = f"#{random.randrange(0x1000000):06x}"
if 'color_2' not in properties:
self.properties['color_2'] = f"#{random.randrange(0x1000000):06x}"
def calculate(self, candles, num_results=800):
# These indicators do computations over a period number of price data points.
# So we need at least that plus what ever amount of results needed.
# It seems it needs num_of_nans = (slow_p) - 2) + signal_p
# Slow_p or fast_p which ever is greater should be used in the calc below.
# TODO Investigating this should it be an error.
if self.properties['fast_p'] > self.properties['slow_p']:
raise ValueError('Possible Error: calculating MACD fast_period needs to higher then slow_period')
# Not sure about the lookback period for macd algorithm below was a result of trial and error.
num_cv = (self.properties['slow_p'] - 2) + self.properties['signal_p'] + num_results
closing_data = candles.get_latest_values(value_name='close', num_record=num_cv)
if len(closing_data) < num_cv:
print(f'Couldn\'t calculate {self.properties["type"]} for time period of {self.properties["slow_p"]}')
print('Not enough data available')
return
# Initialize two arrays to hold a list of closing values and
# a list of timestamps associated with these values
closes = []
ts = []
# Isolate all the closing values and timestamps from
# the dictionary object
for each in closing_data:
closes.append(each['close'])
ts.append(each['time'])
# Convert the list of closes to a numpy array
np_real_data = np.array(closes, dtype=float)
# Pass the closing values and the period parameter to talib
macd, signal, hist = talib.MACD(
np_real_data,
self.properties['fast_p'],
self.properties['slow_p'],
self.properties['signal_p'])
# Combine the new data with the timestamps
# Warning: The first (<period> -1) of values are <NAN>.
# But they should get trimmed off
macd = macd[-num_results:]
signal = signal[-num_results:]
hist = hist[-num_results:]
ts = ts[-num_results:]
r_macd = []
r_signal = []
r_hist = []
# Set the current indicator values to the last value calculated.
self.properties['macd'] = round(float(macd[-1]), 2)
self.properties['signal'] = round(float(signal[-1]), 2)
for each in range(len(macd)):
# filter out nan values
if np.isnan(macd[each]):
continue
r_macd.append({'time': ts[each], 'value': macd[each]})
r_signal.append({'time': ts[each], 'value': signal[each]})
r_hist.append({'time': ts[each], 'value': hist[each]})
r_data = [r_macd, r_signal, r_hist]
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('MACD')
class Indicators:
def __init__(self, candles, config):
# Object containing Price and candle data.
self.candles = candles
# A reference to a dictionary of indicators and properties.
# This is updated when a new indicator is created, so it's state should save.
self.indicator_list = config.indicator_list
# Dictionary of indicators objects
self.indicators = {}
# This fill above two dicts
self.create_loaded_indicators()
# Create a List of all available indicator types
self.indicator_types = indicator_types
# A list of values to use with bolenger bands
self.bb_ma_val = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4, 'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8}
def create_loaded_indicators(self):
for each in self.indicator_list:
self.create_indicator(each, self.indicator_list[each]['type'], self.indicator_list[each])
@staticmethod
def get_indicator_defaults():
"""Set the default settings for each indicator"""
indicator_list = {
'EMA 5': {'type': 'EMA', 'period': 5, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 15': {'type': 'EMA', 'period': 15, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 20': {'type': 'EMA', 'period': 20, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'EMA 200': {'type': 'EMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
'value': 0},
'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2,
'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': '38691.58',
'value2': '38552.36',
'value3': '38413.14', 'visible': 'True'},
'vol': {'type': 'Volume', 'visible': True, 'value': 0}
}
return indicator_list
def get_indicator_list(self):
# Returns a list of all the indicator object in this class instance
if not self.indicator_list:
raise ValueError('get_indicator_list(): No indicators in the list')
return self.indicator_list
def get_enabled_indicators(self):
""" Loop through all indicators and return a list of indicators marked visible """
enabled_indicators = []
i_list = self.get_indicator_list()
for indctr in i_list:
if i_list[indctr]['visible']:
enabled_indicators.append(indctr)
return enabled_indicators
def get_indicator_data(self, symbol=None, interval=None, start_ts=None, num_results=800):
# Loop through all the indicators. If enabled, run the appropriate
# update function. Return all the results as a dictionary object.
if symbol is not None:
if self.candles.config.trading_pair != symbol:
print(f'get_indicator_data: request candle data for {symbol}')
if interval is not None:
if self.candles.config.chart_interval != interval:
print(f'get_indicator_data: request candle data for {interval}')
if start_ts is not None:
print(f'get_indicator_data: request candle data from: {start_ts}')
# Get a list of indicator objects and a list of enabled indicators names.
i_list = self.indicators
enabled_i = self.get_enabled_indicators()
candles = self.candles
result = {}
# Loop through all indicator objects in i_list
for each_i in i_list:
# If the indicator's not enabled skip to next each_i
if each_i not in enabled_i:
continue
result[each_i] = i_list[each_i].calculate(candles, num_results)
return result
def delete_indicator(self, indicator):
del self.indicator_list[indicator]
del self.indicators[indicator]
def create_indicator(self, name, itype, properties):
if itype == 'SMA':
self.indicators[name] = SMA(name, itype, properties)
if itype == 'EMA':
self.indicators[name] = EMA(name, itype, properties)
if itype == 'RSI':
self.indicators[name] = RSI(name, itype, properties)
if itype == 'LREG':
self.indicators[name] = LREG(name, itype, properties)
if itype == 'ATR':
self.indicators[name] = ATR(name, itype, properties)
if itype == 'BOLBands':
self.indicators[name] = BolBands(name, itype, properties)
if itype == 'MACD':
self.indicators[name] = MACD(name, itype, properties)
if itype == 'Volume':
self.indicators[name] = Volume(name, itype, properties)
if name not in self.indicator_list:
# If we are loading from file it would already exist in here before creation.
self.indicator_list[name] = properties
return
def update_indicators(self):
return self.get_indicator_data(self, num_results=1)