import csv import datetime import sys import random import numpy as np import talib import config from binance.client import Client from binance.enums import * import yaml class BrighterData: def __init__(self): # Initialise a connection to the Binance client API self.client = Client(config.API_KEY, config.API_SECRET) # The title of our program self.application_title = 'BrighterTrades' # Settings for the main chart on our UI self.chart_configuration = { 'chart_interval': KLINE_INTERVAL_15MINUTE, 'trading_pair': 'BTCUSDT', } # The maximum number of candles to store in memory self.max_data_loaded = 1000 # List of all available indicator types self.indicator_types = {} # List of all available indicators self.indicator_list = None # Add default indicators and their default values to self.indicator_list self.set_indicator_defaults() # Dictionary of exchange and account data self.exchange_data = {} # Set the values in self.exchange_data from information retrieved from exchange. self.set_exchange_data() # The name of the file that stores saved_data self.config_FN = 'config.yml' # Load any saved data from file self.config_and_states('load') # The entire loaded candle history self.candlesticks = [] # List of dictionaries of timestamped high, low, and closing values self.latest_high_values = [] self.latest_low_values = [] self.latest_close_values = [] # Values of the last candle received self.last_candle = None # List of dictionaries of timestamped volume values self.latest_vol = [] # Set the instance variable of candlesticks, latest_close_values, high, low, closing, volume, and last_candle self.set_candle_history() # A list of values to use with bolenger bands self.bb_ma_val = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4, 'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8} def get_js_init_data(self): """Returns a JSON object of initialization data for the javascript in the rendered HTML""" js_data = {'i_types': self.indicator_types, 'indicators': self.indicator_list, 'interval': self.chart_configuration['chart_interval'], 'trading_pair': self.chart_configuration['trading_pair']} return js_data def config_and_states(self, cmd): """Loads or saves configurable data to the file set in self.config_FN""" # Application configuration and object states saved_data = { 'indicator_list': self.indicator_list, 'chart_configuration': self.chart_configuration } def set_loaded_values(): self.indicator_list = saved_data['indicator_list'] self.chart_configuration = saved_data['chart_configuration'] def load_configuration(filepath): """load file data""" with open(filepath, "r") as file_descriptor: data = yaml.safe_load(file_descriptor) return data def save_configuration(filepath, data): """Saves file data""" with open(filepath, "w") as file_descriptor: yaml.dump(data, file_descriptor) if cmd == 'load': # If load_configuration() finds a file it overwrites # the saved_data object otherwise it creates a new file # with the defaults contained in saved_data> try: saved_data = load_configuration(self.config_FN) set_loaded_values() except IOError: save_configuration(self.config_FN, saved_data) elif cmd == 'save': try: save_configuration(self.config_FN, saved_data) except IOError: raise ValueError("Couldn't save the file") else: raise ValueError('Invalid command received') def load_candle_history(self, symbol, interval): """ Retrieve candlestick history from a file and append it with more recent exchange data while updating the file record. This method only get called if the data is requested. This is to avoid maintaining irrelevant data files.""" start_datetime = datetime.datetime(2017, 1, 1) # Create a filename from the function parameters. # Format is symbol_interval_start_date: example - 'BTCUSDT_15m_2017-01-01' file_name = f'{symbol}_{interval}_{start_datetime.date()}' # List of price data. ,,,,, # # , candlesticks = [] try: # Populate from if it exists. print(f'Attempting to open: {file_name}') with open(file_name, 'r') as file: reader = csv.reader(file, delimiter=',') # Load the data here for row in reader: candlesticks.append(row) print('File loaded') # Open for appending file = open(file_name, 'a', newline='') candlestick_writer = csv.writer(file, delimiter=',') except IOError: # If the file doesn't exist it must be created. print(f'{file_name} not found: Creating the file') # Open for writing file = open(file_name, 'w', newline='') candlestick_writer = csv.writer(file, delimiter=',') # If no candlesticks were loaded from file. Set a date to start loading from in the # variable with a default value stored in . if not candlesticks: last_candle_stamp = start_datetime.timestamp() * 1000 else: # Set with the timestamp of the last candles on file last_candle_stamp = candlesticks[-1][0] # Request any missing candlestick data from the exchange recent_candlesticks = self.client.get_historical_klines(symbol, interval, start_str=int(last_candle_stamp)) # Discard the first row of candlestick data as it will be a duplicate***DOUBLE CHECK THIS recent_candlesticks.pop(0) # Append the candlestick list and the file for candlestick in recent_candlesticks: candlesticks.append(candlestick) candlestick_writer.writerow(candlestick) # Close the file and return the entire candlestick history file.close() return candlesticks def set_latest_vol(self): # Extracts a list of volume values from all the loaded candlestick # data and store it in a dictionary keyed to timestamp of measurement. latest_vol = [] last_clp = 0 for data in self.candlesticks: clp = int(float(data[4])) if clp < last_clp: color = 'rgba(255,82,82, 0.8)' # red else: color = 'rgba(0, 150, 136, 0.8)' # green vol_data = { "time": int(data[0]) / 1000, "value": int(float(data[5])), "color": color } last_clp = clp latest_vol.append(vol_data) self.latest_vol = latest_vol return def get_latest_vol(self, num_record=500): # Returns the latest closing values if self.latest_vol: if len(self.latest_vol) < num_record: print('Warning: get_latest_vol() - Requested too more records then available') num_record = len(self.latest_vol) return self.latest_vol[-num_record:] else: raise ValueError('Warning: get_latest_vol(): Values are not set') def set_latest_high_values(self): # Extracts a list of close values from all the loaded candlestick # data and store it in a dictionary keyed to timestamp of measurement. latest_high_values = [] for data in self.candlesticks: high_data = { "time": int(data[0]) / 1000, "high": data[2] } latest_high_values.append(high_data) self.latest_high_values = latest_high_values return def get_latest_high_values(self, num_record=500): # Returns the latest closing values if self.latest_high_values: if len(self.latest_high_values) < num_record: print('Warning: latest_high_values() - Requested too more records then available') num_record = len(self.latest_high_values) return self.latest_high_values[-num_record:] else: raise ValueError('Warning: latest_high_values(): Values are not set') def set_latest_low_values(self): # Extracts a list of close values from all the loaded candlestick # data and store it in a dictionary keyed to timestamp of measurement. latest_low_values = [] for data in self.candlesticks: low_data = { "time": int(data[0]) / 1000, "low": data[3] } latest_low_values.append(low_data) self.latest_low_values = latest_low_values return def get_latest_low_values(self, num_record=500): # Returns the latest closing values if self.latest_low_values: if len(self.latest_low_values) < num_record: print('Warning: latest_low_values() - Requested too more records then available') num_record = len(self.latest_low_values) return self.latest_low_values[-num_record:] else: raise ValueError('Warning: latest_low_values(): Values are not set') def set_latest_close_values(self): # Extracts a list of close values from all the loaded candlestick # data and store it in a dictionary keyed to timestamp of measurement. latest_close_values = [] for data in self.candlesticks: close_data = { "time": int(data[0]) / 1000, "close": data[4] } latest_close_values.append(close_data) self.latest_close_values = latest_close_values return def get_latest_close_values(self, num_record=500): # Returns the latest closing values if self.latest_close_values: if len(self.latest_close_values) < num_record: print('Warning: get_latest_close_values() - Requested too more records then available') num_record = len(self.latest_close_values) return self.latest_close_values[-num_record:] else: raise ValueError('Warning: get_latest_close_values(): Values are not set') def set_candle_history(self, symbol=None, interval=None, max_data_loaded=None): if not max_data_loaded: max_data_loaded = self.max_data_loaded if not symbol: symbol = self.chart_configuration['trading_pair'] if not interval: interval = self.chart_configuration['chart_interval'] if self.candlesticks: print('set_candle_history(): Reloading candle data') else: print('set_candle_history(): Loading candle data') # Load candles from file cdata = self.load_candle_history(symbol, interval) # Trim the beginning of the returned list to size of max_data_loaded of less if len(cdata) < max_data_loaded: max_data_loaded = len(cdata) self.candlesticks = cdata[-max_data_loaded:] # Set an instance dictionary of timestamped high, low, closing values self.set_latest_high_values() self.set_latest_low_values() self.set_latest_close_values() # Extract the volume data from self.candlesticks and store it in self.latest_vol self.set_latest_vol() # Set an instance reference of the last candle self.last_candle = self.convert_candle(self.candlesticks[-1]) print('set_candle_history(): Candle data Loaded') return def convert_candle(self, candle): candlestick = { "time": int(candle[0]) / 1000, "open": candle[1], "high": candle[2], "low": candle[3], "close": candle[4] } return candlestick def get_candle_history(self, symbol, interval, num_records): if len(self.candlesticks) < num_records: print('Warning: get_candle_history() Requested more records then available') num_records = len(self.candlesticks) # Drop everything but the requested number of records candlesticks = self.candlesticks[-num_records:] # Reformat relevant candlestick data into a list of python dictionary objects. # Binance stores timestamps in milliseconds but lightweight charts doesn't, # so it gets divided by 1000 processed_candlesticks = [] for data in candlesticks: candlestick = { "time": int(data[0]) / 1000, "open": data[1], "high": data[2], "low": data[3], "close": data[4] } processed_candlesticks.append(candlestick) # Return a list of candlestick objects return processed_candlesticks # list enabled indicators def get_enabled_indicators(self): """ Loop through all indicators and make a list of indicators marked visible """ enabled_indicators = [] i_list = self.get_indicator_list() for indctr in i_list: if i_list[indctr]['visible']: enabled_indicators.append(indctr) return enabled_indicators def set_indicator_defaults(self): """Set the default settings for each indicator""" self.indicator_types = {'simple_indicators': ['RSI', 'SMA', 'EMA', 'LREG'], 'other': ['Volume', 'BOLBands', 'MACD', 'ATR']} self.indicator_list = { 'SMA 21': {'type': 'SMA', 'period': 21, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}", 'value': 0}, 'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}", 'value': 0}, 'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}", 'value': 0}, 'SMA 200': {'type': 'SMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}", 'value': 0}, 'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}", 'value': 0}, 'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}", 'value': 0}, 'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2, 'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': 0, 'value1': '38691.58', 'value2': '38552.36', 'value3': '38413.14', 'visible': 'True'}, 'vol': {'type': 'Volume', 'visible': True, 'value': 0} } return def get_indicator_list(self): # Returns a list of all the indicator object in this class instance if not self.indicator_list: raise ValueError('get_indicator_list(): No indicators in the list') return self.indicator_list def set_exchange_data(self): # Pull all balances from client while discarding assets with zero balance account = self.client.futures_coin_account_balance() self.exchange_data['balances'] = [asset for asset in account if float(asset['balance']) > 0] # Pull all available symbols from client exchange_info = self.client.get_exchange_info() self.exchange_data['symbols'] = exchange_info['symbols'] # Available intervals self.exchange_data['intervals'] = ( KLINE_INTERVAL_1MINUTE, KLINE_INTERVAL_3MINUTE, KLINE_INTERVAL_5MINUTE, KLINE_INTERVAL_15MINUTE, KLINE_INTERVAL_30MINUTE, KLINE_INTERVAL_1HOUR, KLINE_INTERVAL_2HOUR, KLINE_INTERVAL_4HOUR, KLINE_INTERVAL_6HOUR, KLINE_INTERVAL_8HOUR, KLINE_INTERVAL_12HOUR, KLINE_INTERVAL_1DAY, KLINE_INTERVAL_3DAY, KLINE_INTERVAL_1WEEK, KLINE_INTERVAL_1MONTH ) def get_rendered_data(self): """ Data to be rendered in the HTML """ rd = {} rd['title'] = self.application_title # Title of the page rd['my_balances'] = self.exchange_data['balances'] # Balances on the exchange rd['symbols'] = self.exchange_data['symbols'] # Symbols information from the exchange rd['intervals'] = self.exchange_data['intervals'] # Time candle time intervals available to stream rd['chart_interval'] = self.chart_configuration['chart_interval'] # The charts current interval setting rd['indicator_types'] = self.indicator_types # All the types indicators Available rd['indicator_list'] = self.get_indicator_list() # indicators available rd['enabled_indicators'] = self.get_enabled_indicators() # list of indicators that are enabled rd['ma_vals'] = self.bb_ma_val # A list of acceptable values to use with bolenger band creation return rd def get_indicator_data(self, symbol=None, interval=None, num_results=100): # Loop through all the indicators. If enabled, run the appropriate # update function. Return all the results as a dictionary object. if not interval: interval = self.chart_configuration['chart_interval'] if not symbol: symbol = self.chart_configuration['trading_pair'] # Get a list of indicator objects and a list of enabled indicators names. i_list = self.get_indicator_list() enabled_i = self.get_enabled_indicators() result = {} # Loop through all indicator objects in i_list for each_i in i_list: # If the indicator's not enabled skip to next each_i if each_i not in enabled_i: continue i_type = i_list[each_i]['type'] # If it is a simple indicator. if i_type in self.indicator_types['simple_indicators']: result[each_i] = self.calculate_simple_indicator(i_type=i_type, period=i_list[each_i]['period']) if i_type in self.indicator_types['other']: if i_type == 'BOLBands': result[each_i] = self.calculate_bolbands(i_type=i_type, period=i_list[each_i]['period'], devup=i_list[each_i]['devup'], devdn=i_list[each_i]['devdn'], ma=i_list[each_i]['ma']) if i_type == 'MACD': result[each_i] = self.calculate_macd(i_type=i_type, fast_p=i_list[each_i]['fast_p'], slow_p=i_list[each_i]['slow_p'], signal_p=i_list[each_i]['signal_p']) if i_type == 'Volume': result[each_i] = self.get_volume(i_type=i_type) if i_type == 'ATR': result[each_i] = self.calculate_atr(i_type=i_type, period=i_list[each_i]['period']) return result def get_volume(self, i_type, num_results=800): r_data = self.get_latest_vol() r_data = r_data[-num_results:] return {"type": i_type, "data": r_data} def calculate_macd(self, i_type, fast_p=12, slow_p=26, signal_p=9, num_results=800): # These indicators do computations over a period number of price data points. # So we need at least that plus what ever amount of results needed. # It seems it needs num_of_nans = (slow_p) - 2) + signal_p # TODO: slow_p or fast_p which ever is greater should be used in the calc below. # TODO but i am investigating this. if fast_p > slow_p: raise ValueError('Error I think: TODO: calculate_macd()') num_cv = (slow_p - 2) + signal_p + num_results closing_data = self.get_latest_close_values(num_cv) if len(closing_data) < num_cv: print(f'Couldn\'t calculate {i_type} for time period of {period}') print('Not enough data availiable') return # Initialize two arrays to hold a list of closing values and # a list of timestamps associated with these values closes = [] ts = [] # Isolate all the closing values and timestamps from # the dictionary object for each in closing_data: closes.append(each['close']) ts.append(each['time']) # Convert the list of closes to a numpy array np_real_data = np.array(closes, dtype=float) # Pass the closing values and the period parameter to talib macd, signal, hist = talib.MACD(np_real_data, fast_p, slow_p, signal_p) # Combine the new data with the timestamps # Warning: The first ( -1) of values are . # But they should get trimmed off macd = macd[-num_results:] if len(macd) == 1: print('looks like after slicing') print(macd) signal = signal[-num_results:] hist = hist[-num_results:] ts = ts[-num_results:] r_macd = [] r_signal = [] r_hist = [] for each in range(len(macd)): # filter out nan values if np.isnan(macd[each]): continue r_macd.append({'time': ts[each], 'value': macd[each]}) r_signal.append({'time': ts[each], 'value': signal[each]}) r_hist.append({'time': ts[each], 'value': hist[each]}) r_data = [r_macd, r_signal, r_hist] return {"type": i_type, "data": r_data} def calculate_atr(self, i_type, period, num_results=800): # These indicators do computations over period number of price data points. # So we need at least that plus what ever amount of results needed. num_cv = period + num_results high_data = self.get_latest_high_values(num_cv) low_data = self.get_latest_low_values(num_cv) close_data = self.get_latest_close_values(num_cv) if len(close_data) < num_cv: print(f'Couldn\'t calculate {i_type} for time period of {period}') print('Not enough data availiable') return # Initialize 4 arrays to hold a list of h/l/c values and # a list of timestamps associated with these values highs = [] lows = [] closes = [] ts = [] # Isolate all the values and timestamps from # the dictionary objects for each in high_data: highs.append(each['high']) for each in low_data: lows.append(each['low']) for each in close_data: closes.append(each['close']) ts.append(each['time']) # Convert the lists to a numpy array np_highs = np.array(highs, dtype=float) np_lows = np.array(lows, dtype=float) np_closes = np.array(closes, dtype=float) # Pass the closing values and the period parameter to talib atr = talib.ATR(high=np_highs, low=np_lows, close=np_closes, timeperiod=period) # Combine the new data with the timestamps # Warning: The first ( -1) of values are . # But they should get trimmed off atr = atr[-num_results:] ts = ts[-num_results:] r_data = [] for each in range(len(atr)): # filter out nan values if np.isnan(atr[each]): continue r_data.append({'time': ts[each], 'value': atr[each]}) return {"type": i_type, "data": r_data} def calculate_bolbands(self, i_type, period, devup=2, devdn=2, ma=0, num_results=800): # These indicators do computations over period number of price data points. # So we need at least that plus what ever amount of results needed. # Acceptable values for ma in the talib.BBANDS # {'SMA':0,'EMA':1, 'WMA' : 2, 'DEMA' : 3, 'TEMA' : 4, 'TRIMA' : 5, 'KAMA' : 6, 'MAMA' : 7, 'T3' : 8} num_cv = period + num_results closing_data = self.get_latest_close_values(num_cv) if len(closing_data) < num_cv: print(f'Couldn\'t calculate {i_type} for time period of {period}') print('Not enough data availiable') return # Initialize two arrays to hold a list of closing values and # a list of timestamps associated with these values closes = [] ts = [] # Isolate all the closing values and timestamps from # the dictionary object for each in closing_data: closes.append(each['close']) ts.append(each['time']) # Convert the list of closes to a numpy array np_real_data = np.array(closes, dtype=float) # Pass the closing values and the period parameter to talib upper, middle, lower = talib.BBANDS(np_real_data, timeperiod=period, # number of non-biased standard deviations from the mean nbdevup=devup, nbdevdn=devdn, # Moving average type: simple moving average here matype=ma) # Combine the new data with the timestamps # Warning: The first ( -1) of values are . # But they should get trimmed off i_values_u = upper[-num_results:] i_values_m = middle[-num_results:] i_values_l = lower[-num_results:] ts = ts[-num_results:] r_data_u = [] r_data_m = [] r_data_l = [] for each in range(len(i_values_u)): # filter out nan values if np.isnan(i_values_u[each]): continue r_data_u.append({'time': ts[each], 'value': i_values_u[each]}) r_data_m.append({'time': ts[each], 'value': i_values_m[each]}) r_data_l.append({'time': ts[each], 'value': i_values_l[each]}) r_data = [r_data_u, r_data_m, r_data_l] return {"type": i_type, "data": r_data} def calculate_simple_indicator(self, i_type, period, num_results=800): # Valid types of indicators for this function if i_type not in self.indicator_types['simple_indicators']: raise ValueError(f'calculate_simple_indicator(): Unknown type: {i_type}') # These indicators do computations over period number of price data points. # So we need at least that plus what ever amount of results needed. num_cv = period + num_results closing_data = self.get_latest_close_values(num_cv) if len(closing_data) < num_cv: print(f'Couldn\'t calculate {i_type} for time period of {period}') print('Not enough data availiable') return # Initialize two arrays to hold a list of closing values and # a list of timestamps associated with these values closes = [] ts = [] # Isolate all the closing values and timestamps from # the dictionary object for each in closing_data: closes.append(each['close']) ts.append(each['time']) # Convert the list of closes to a numpy array np_real_data = np.array(closes, dtype=float) # Pass the closing values and the period parameter to talib if i_type == 'SMA': i_values = talib.SMA(np_real_data, period) if i_type == 'RSI': i_values = talib.RSI(np_real_data, period) if i_type == 'EMA': i_values = talib.EMA(np_real_data, period) if i_type == 'LREG': i_values = talib.LINEARREG(np_real_data, period) # Combine the new data with the timestamps # Warning: The first of rsi values are . # But they should get trimmed off todo get rid of try except *just debuging info try: i_values = i_values[-num_results:] except: raise ValueError(f'error: {i_type} {i_values}') ts = ts[-num_results:] r_data = [] for each in range(len(i_values)): r_data.append({'time': ts[each], 'value': i_values[each]}) return {"type": i_type, "data": r_data} def create_indicator(self, name, type, properties): # Indicator type checking before adding to a dictionary of properties properties['type'] = type # Force color and period properties for simple indicators if type in self.indicator_types['simple_indicators']: if 'color' not in properties: properties['color'] = f"#{random.randrange(0x1000000):06x}" if 'period' not in properties: properties['period'] = 20 if type in self.indicator_types['other']: ul_col = f"#{random.randrange(0x1000000):06x}" if type == 'BOLBands': if 'period' not in properties: properties['period'] = 50 if 'color_1' not in properties: properties['color_1'] = ul_col if 'color_2' not in properties: properties['color_2'] = f"#{random.randrange(0x1000000):06x}" if 'color_3' not in properties: properties['color_3'] = ul_col if 'value1' not in properties: properties['value1'] = 0 if 'value2' not in properties: properties['value2'] = 0 if 'value3' not in properties: properties['value3'] = 0 if 'devup' not in properties: properties['devup'] = 2 if 'devdn' not in properties: properties['devdn'] = 2 if 'ma' not in properties: properties['ma'] = 1 if type == 'MACD': if 'fast_p' not in properties: properties['fast_p'] = 12 if 'slow_p' not in properties: properties['slow_p'] = 26 if 'signal_p' not in properties: properties['signal_p'] = 9 if 'macd' not in properties: properties['macd'] = 0 if 'signal' not in properties: properties['signal'] = 0 if 'hist' not in properties: properties['hist'] = 0 if 'color_1' not in properties: properties['color_1'] = f"#{random.randrange(0x1000000):06x}" if 'color_2' not in properties: properties['color_2'] = f"#{random.randrange(0x1000000):06x}" if type == 'ATR': if 'period' not in properties: properties['period'] = 50 if 'color' not in properties: properties['color'] = f"#{random.randrange(0x1000000):06x}" # Force value and visibility for all indicators if 'value' not in properties: properties['value'] = 0 if 'visible' not in properties: properties['visible'] = True # Add the dictionary of properties and values to an instance list self.indicator_list[name] = properties return def received_cdata(self, cdata): # If this is the first candle received, # then just set last_candle and return. if not self.last_candle: self.last_candle = cdata return # If this candle is the same as last candle return nothing to do. if cdata['time']: if cdata['time'] == self.last_candle['time']: return # **** New candle is received *** # Update the instance data records. self.last_candle = cdata self.latest_close_values.append({'time': cdata['time'], 'close': cdata['close']}) self.latest_high_values.append({'time': cdata['time'], 'high': cdata['high']}) self.latest_low_values.append({'time': cdata['time'], 'low': cdata['low']}) self.latest_vol.append({'time': cdata['time'], 'value': cdata['vol']}) # Update indicators updates = self.update_indicators() return updates def update_indicators(self): enabled_indcrs = self.get_enabled_indicators() indcrs_list = self.get_indicator_list() # Updated data is collected in this dictionary object updates = {} # Loop through all enabled indicators for indcr in enabled_indcrs: # Get the type of the indicator being updated i_type = indcrs_list[indcr]['type'] # Update the indicator with a function appropriate for its kind # TODO - Check EMA results i see a bit of a sharp turn in the ema line on # the interface side when reloading the page. It smooths out after a full reload. if i_type in self.indicator_types['simple_indicators']: updates[indcr] = self.calculate_simple_indicator(i_type=i_type, period=indcrs_list[indcr]['period'], num_results=1) if i_type == 'BOLBands': updates[indcr] = self.calculate_bolbands(i_type=i_type, period=indcrs_list[indcr]['period'], devup=indcrs_list[indcr]['devup'], devdn=indcrs_list[indcr]['devdn'], ma=indcrs_list[indcr]['ma'], num_results=1) if i_type == 'MACD': updates[indcr] = self.calculate_macd(i_type=i_type, fast_p=indcrs_list[indcr]['fast_p'], slow_p=indcrs_list[indcr]['slow_p'], signal_p=indcrs_list[indcr]['signal_p'], num_results=1) if i_type == 'ATR': updates[indcr] = self.calculate_atr(i_type=i_type, period=indcrs_list[indcr]['period'], num_results=1) if i_type == 'Volume': updates[indcr] = self.get_volume(i_type=i_type, num_results=1) return updates def received_new_signal(self, data): # Check the data. if 'sigName' not in data: return 'No name provided' Signal print(data) app_data = BrighterData()