brighter-trading/src/indicators.py

639 lines
27 KiB
Python

import json
import random
from typing import Any, List, Optional, Dict
import numpy as np
import pandas as pd
import talib
# A list container to hold all available indicator types. This list is
# appended everytime an indicator class is defined below.
indicator_types = []
class Indicator:
def __init__(self, name: str, indicator_type: str, properties: dict):
self.name = name
self.properties = properties
self.properties.setdefault('type', indicator_type)
self.properties.setdefault('value', 0)
def calculate(self, candles: pd.DataFrame, user_name: str, num_results: int = 1) -> dict:
"""
Calculates the indicator values over a span of price data.
:param candles: The candlestick data.
:param user_name: The user_name.
:param num_results: The number of requested results.
:return: A dictionary of indicator records.
"""
closes = candles.close.to_numpy(dtype='float')
i_values = self.process(closes, self.properties['period'])
self.properties['value'] = round(float(i_values[-1]), 2)
df = pd.DataFrame({'time': candles.open_time, 'value': i_values.tolist()})
r_data = df.iloc[self.properties['period']:]
return {"type": self.properties['type'], "data": r_data.to_dict('records')}
def process(self, data, period):
"""
Abstract method that should be overridden in the subclass.
"""
raise NotImplementedError("Subclasses must implement the 'process' method.")
class Volume(Indicator):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
def calculate(self, candles: pd.DataFrame, user_name: str, num_results: int = 1) -> dict:
"""
Fetches the volume data and combines it with red or green
color info representing higher or lower volume changes.
:param candles: The price data to analyze.
:param user_name: The name of the user executing this function.
:param num_results: The number of results requested.
:return: A dictionary of volume records.
"""
def get_color(row):
row_index = row.name
if (row_index - 1) not in volumes.index:
return 'rgba(0, 150, 136, 0.8)' # Green
if cndls.close.iloc[row_index - 1] < cndls.close.iloc[row_index]:
return 'rgba(0, 150, 136, 0.8)' # Green
else:
return 'rgba(255, 82, 82, 0.8)' # Red
cndls = candles.copy().reset_index(drop=True)
# Extract the open_time and volume columns
volumes = cndls.loc[:, ['open_time', 'volume']]
# Add the color field using apply() and get_color() function
volumes['color'] = volumes.apply(get_color, axis=1)
# Rename the volume column to 'value'
volumes = volumes.rename(columns={'volume': 'value'})
# Rename the open_time column to 'time'
volumes = volumes.rename(columns={'open_time': 'time'})
# Get the last volume value as the current volume
current_volume = volumes['value'].iloc[-1]
self.properties['value'] = float(current_volume)
# Prepare the result data with the required structure
r_data = volumes.to_dict('records')
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('Volume')
class SMA(Indicator):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
self.properties.setdefault('color', f"#{random.randrange(0x1000000):06x}")
self.properties.setdefault('period', 20)
def process(self, data: np.ndarray, period: int) -> np.ndarray:
"""
Calculate the Simple Moving Average (SMA) of the given data.
:param data: A numpy array of data points.
:param period: The period over which to calculate the SMA.
:return: A numpy array containing the SMA values.
"""
return talib.SMA(data, period)
indicator_types.append('SMA')
class EMA(SMA):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
def process(self, data: np.ndarray, period: int) -> np.ndarray:
"""
Calculate the Exponential Moving Average (EMA) of the given data.
:param data: A numpy array of data points.
:param period: The period over which to calculate the EMA.
:return: A numpy array containing the EMA values.
"""
return talib.EMA(data, period)
indicator_types.append('EMA')
class RSI(SMA):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
def process(self, data: np.ndarray, period: int) -> np.ndarray:
"""
Calculate the Relative Strength Index (RSI) of the given data.
:param data: A numpy array of data points.
:param period: The period over which to calculate the RSI.
:return: A numpy array containing the RSI values.
"""
return talib.RSI(data, period)
indicator_types.append('RSI')
class LREG(SMA):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
def process(self, data: np.ndarray, period: int) -> np.ndarray:
"""
Calculate the Linear Regression (LREG) of the given data.
:param data: A numpy array of data points.
:param period: The period over which to calculate the LREG.
:return: A numpy array containing the LREG values.
"""
return talib.LINEARREG(data, period)
indicator_types.append('LREG')
class ATR(SMA):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
def calculate(self, candles: pd.DataFrame, user_name: str, num_results: int = 1) -> dict:
"""
Calculate the Average True Range (ATR) indicator.
:param candles: A DataFrame containing candlestick data.
:param user_name: The name of the user executing this function.
:param num_results: The number of results requested.
:return: A dictionary with the type and data of the indicator.
"""
highs = candles.high.to_numpy(dtype='float')
lows = candles.low.to_numpy(dtype='float')
closes = candles.close.to_numpy(dtype='float')
atr = talib.ATR(high=highs,
low=lows,
close=closes,
timeperiod=self.properties['period'])
df = pd.DataFrame({'time': candles.open_time, 'value': atr})
r_data = df.iloc[self.properties['period']:].to_dict('records')
self.properties['value'] = round(float(atr[-1]), 2)
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('ATR')
class BolBands(Indicator):
def __init__(self, name: str, indicator_type: str, properties: dict):
super().__init__(name, indicator_type, properties)
ul_col = f"#{random.randrange(0x1000000):06x}"
self.properties.setdefault('period', 50)
self.properties.setdefault('color_1', ul_col)
self.properties.setdefault('color_2', f"#{random.randrange(0x1000000):06x}")
self.properties.setdefault('color_3', ul_col)
self.properties.setdefault('value', 0)
self.properties.setdefault('value2', 0)
self.properties.setdefault('value3', 0)
self.properties.setdefault('devup', 2)
self.properties.setdefault('devdn', 2)
self.properties.setdefault('ma', 1)
def calculate(self, candles: pd.DataFrame, user_name: str, num_results: int = 1) -> dict:
"""
Calculate the Bollinger Bands indicator for the given candles.
:param candles: The candlestick data.
:param user_name: The user_name.
:param num_results: The number of results requested.
:return: A dictionary containing the calculated Bollinger Bands values.
"""
np_real_data = candles.close.to_numpy(dtype='float')
upper, middle, lower = talib.BBANDS(np_real_data,
timeperiod=self.properties['period'],
nbdevup=self.properties['devup'],
nbdevdn=self.properties['devdn'],
matype=self.properties['ma'])
self.properties['value'] = round(float(upper[-1]), 2)
self.properties['value2'] = round(float(middle[-1]), 2)
self.properties['value3'] = round(float(lower[-1]), 2)
df1 = pd.DataFrame({'time': candles.open_time, 'value': upper}).dropna()
df2 = pd.DataFrame({'time': candles.open_time, 'value': middle}).dropna()
df3 = pd.DataFrame({'time': candles.open_time, 'value': lower}).dropna()
r_data = [df1.to_dict('records'), df2.to_dict('records'), df3.to_dict('records')]
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('BOLBands')
class MACD(Indicator):
def __init__(self, name, indicator_type, properties):
super().__init__(name, indicator_type, properties)
self.properties.setdefault('fast_p', 12)
self.properties.setdefault('slow_p', 26)
self.properties.setdefault('signal_p', 9)
self.properties.setdefault('macd', 0)
self.properties.setdefault('signal', 0)
self.properties.setdefault('hist', 0)
self.properties.setdefault('color_1', f"#{random.randrange(0x1000000):06x}")
self.properties.setdefault('color_2', f"#{random.randrange(0x1000000):06x}")
self.properties['period'] = self.properties['slow_p'] + self.properties['signal_p'] - 2
# Adjusting the period
# Not sure about the lookback period for macd algorithm below was a result of trial and error.
num = self.properties['slow_p'] + self.properties['signal_p'] - 2
self.properties['period'] = num
def calculate(self, candles: pd.DataFrame, user_name: str, num_results: int = 800) -> dict:
"""
Calculate the MACD indicator for the given candles.
:param candles: The candlestick data.
:param user_name: The user_name.
:param num_results: The number of results requested.
:return: A dictionary containing the calculated MACD values.
"""
if self.properties['fast_p'] >= self.properties['slow_p']:
raise ValueError('The fast_period should be less than the slow_period.')
closing_data = candles.close
if len(closing_data) < num_results:
print(f"Not enough data available to calculate {self.properties['type']} for the given time period.")
return {}
closes = closing_data.to_numpy(dtype='float')
macd, signal, hist = talib.MACD(closes, self.properties['fast_p'], self.properties['slow_p'],
self.properties['signal_p'])
self.properties['macd'] = round(float(macd[-1]), 2)
self.properties['signal'] = round(float(signal[-1]), 2)
df1 = pd.DataFrame({'time': closing_data.time, 'value': macd}).dropna()
df2 = pd.DataFrame({'time': closing_data.time, 'value': signal}).dropna()
df3 = pd.DataFrame({'time': closing_data.time, 'value': hist}).dropna()
r_data = [df1.to_dict('records'), df2.to_dict('records'), df3.to_dict('records')]
return {"type": self.properties['type'], "data": r_data}
indicator_types.append('MACD')
class Indicators:
def __init__(self, candles, config):
# Object manages and serves price and candle data.
self.candles = candles
# A connection to an object that handles user configuration and persistent data.
self.config = config
# Collection of instantiated indicators objects
self.indicators = pd.DataFrame(columns=['creator', 'name', 'visible',
'kind', 'source', 'properties', 'ref'])
# Create an instance reference of all available indicator types in the global list.
self.indicator_types = indicator_types
# Enums values to use with Bolenger-bands.
self.MV_AVERAGE_ENUM = {'SMA': 0, 'EMA': 1, 'WMA': 2, 'DEMA': 3, 'TEMA': 4,
'TRIMA': 5, 'KAMA': 6, 'MAMA': 7, 'T3': 8}
def load_indicators(self, user_name):
"""
Get the users watch-list from the database and load the indicators into a dataframe.
:return: None
"""
active_indicators: pd.DataFrame = self.config.users.get_indicators(user_name)
if active_indicators is not None:
# Create an instance for each indicator.
for i in active_indicators.itertuples():
self.create_indicator(
creator=user_name, name=i.name,
kind=i.kind, source=i.source,
visible=i.visible, properties=i.properties
)
def save_indicator(self, indicator):
"""
Saves the indicators in the database indexed by the user id.
:return: None
"""
self.config.users.save_indicators(indicator)
# @staticmethod
# def get_indicator_defaults():
# """Set the default settings for each indicator"""
#
# indicator_list = {
# 'EMA 5': {'type': 'EMA', 'period': 5, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'EMA 15': {'type': 'EMA', 'period': 15, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'EMA 20': {'type': 'EMA', 'period': 20, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'EMA 50': {'type': 'EMA', 'period': 50, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'EMA 100': {'type': 'EMA', 'period': 100, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'EMA 200': {'type': 'EMA', 'period': 200, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'RSI 14': {'type': 'RSI', 'period': 14, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'RSI 8': {'type': 'RSI', 'period': 8, 'visible': True, 'color': f"#{random.randrange(0x1000000):06x}",
# 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m', 'exchange_name': 'alpaca'},
# 'Bolenger': {'color_1': '#5ad858', 'color_2': '#f0f664', 'color_3': '#5ad858', 'devdn': 2, 'devup': 2,
# 'ma': 1, 'period': 20, 'type': 'BOLBands', 'value': '38691.58',
# 'value2': '38552.36',
# 'value3': '38413.14', 'visible': True, 'market': 'BTC/USD', 'time_frame': '5m',
# 'exchange_name': 'alpaca'},
# 'vol': {'type': 'Volume', 'visible': True, 'value': 0, 'market': 'BTC/USD', 'time_frame': '5m',
# 'exchange_name': 'alpaca'}
# }
# return indicator_list
def get_indicator_list(self, username: str, only_enabled: bool = False) -> Dict[str, Dict[str, Any]]:
"""
Returns a dictionary of all indicators available to this user.
If only_enabled is True, returns only indicators marked as visible.
:param username: str - The user_name of the user.
:param only_enabled: bool - If True, return only indicators marked as visible.
:return: dict - A dictionary of indicator names as keys and their attributes as values.
"""
user_id = self.config.users.get_id(username)
if not user_id:
raise ValueError(f"Invalid user_name: {username}")
if only_enabled:
indicators_df = self.indicators.query("creator == @user_id and visible=='True'")
else:
indicators_df = self.indicators.query('creator == @user_id')
if indicators_df.empty:
# Attempt to load from storage.
self.load_indicators(user_name=username)
indicators_df = self.indicators.query('creator == @user_id')
# Create the dictionary
result = {}
for _, row in indicators_df.iterrows():
result[row['name']] = {
'type': row['kind'],
'visible': row['visible'],
'value': row['properties'].get('value', ''),
'color': row['properties'].get('color', '')
}
return result
def toggle_indicators(self, user_id: int, indicator_names: list) -> None:
"""
Set the visibility of indicators for a user.
:param user_id: The id of the user.
:param indicator_names: List of indicator names to set as visible.
:return: None
"""
# Validate inputs
if user_id not in self.indicators['creator'].unique():
# raise ValueError(f"Invalid user_id: {user_id}")
# Nothing may be loaded.
return
# Set visibility for all indicators of the user
self.indicators.loc[self.indicators['creator'] == user_id, 'visible'] = False
# Set visibility for the specified indicator names
self.indicators.loc[self.indicators['name'].isin(indicator_names), 'visible'] = True
def edit_indicator(self, user_name: str, params: Any):
# if 'submit' in request.form:
# # Get the name of the indicator
# indicator = request.form['submit']
# # Drop the name and action from our received data
# attributes = dict(list(request.form.items())[2:])
# # All the numbers are string now so turn them back to (int)
# for a in attributes:
# if attributes[a].isdigit():
# attributes[a] = int(attributes[a])
# # if visible is unchecked it doesn't get sent by the form
# if 'visible' not in attributes:
# attributes.update({'visible': False})
# # Set the data in indicators according to <attributes>
# brighter_trades.indicators.indicator_list[indicator] = attributes
#
if 'delete' in params:
indicator = params['delete']
# This will delete in both indicators and config.
self.delete_indicator(indicator)
def new_indicator(self, user_name: str, params) -> None:
"""
Appends a new indicator to a user-specific collection of Indicator definitions.
:param user_name: The name of the user triggering the action.
:param params: The request parameters containing indicator information.
:return: None
"""
indcr = params['newi_name']
indtyp = params['newi_type']
# Validate indicator name and type
if not indcr:
raise ValueError("Indicator name is required.")
if indtyp not in ['SMA', 'EMA', 'RSI', 'LREG', 'ATR', 'BOLBands', 'MACD', 'Volume']:
raise ValueError("Unsupported indicator type.")
# Create a dictionary of properties from the values in request form.
source = {
'source': 'price_data',
'market': params['ei_symbol'],
'time_frame': params['ei_timeframe'],
'exchange_name': params['ei_exchange_name']
}
# Validate properties (assuming properties is a dictionary)
properties = {}
if params['new_prop_obj']:
properties = json.loads(params['new_prop_obj'])
# Create indicator.
self.create_indicator(creator=user_name, name=indcr, kind=indtyp, source=source, properties=properties)
# Update the watch-list in config.
self.save_indicator(self.indicators.loc[self.indicators.name == indcr])
def process_indicator(self, indicator, num_results: int = 1) -> pd.DataFrame | None:
"""
Trigger execution of the indicator's analysis against an updated source.
:param indicator: A named tuple containing indicator data.
:param num_results: The number of results being requested.
:return: The results of the indicator analysis as a DataFrame.
"""
username = self.config.users.get_username(indicator.creator)
src = indicator.source
symbol, timeframe, exchange_name = src['symbol'], src['timeframe'], src['exchange_name']
# Adjust num_results to account for the lookup period if specified in the indicator properties.
if 'period' in indicator.ref.properties:
num_results += indicator.ref.properties['period']
# Request the data from the defined source.
data = self.candles.get_last_n_candles(num_candles=num_results,
asset=symbol, timeframe=timeframe,
exchange=exchange_name, user_name=username)
# Calculate the indicator using the retrieved data.
return indicator.ref.calculate(candles=data, user_name=username, num_results=num_results)
def get_indicator_data(self, user_name: str, source: dict = None,
visible_only: bool = True, start_ts: float = None,
num_results: int = 1000) -> Optional[Dict[str, Any]]:
"""
Loop through enabled indicators in a user's watch-list. Run the appropriate
update function and return a dictionary containing all the results.
:param user_name: The name of the user making the request.
:param source: Pass in a source definition to return only results against a particular source.
:param visible_only: Returns only results marked visible.
:param start_ts: The timestamp to begin the analysis at. (Not implemented yet)
:param num_results: The number of results requested.
:return: A dictionary of timestamped data returned from each indicator indexed by the indicator's name,
or None if no indicators matched the query.
"""
if start_ts:
print("Warning: start_ts has not implemented in get_indicator_data()!")
user_id = self.config.users.get_id(user_name=user_name)
# Construct the query based on user_id and visibility.
query = f"creator == {user_id}"
if visible_only:
query += " and visible == True"
# Filter the indicators based on the query.
indicators = self.indicators.loc[
(self.indicators['creator'] == user_id) & (self.indicators['visible'] == 'True')]
# Return None if no indicators matched the query.
if indicators.empty:
# Attempt to re-load from db
self.load_indicators(user_name=user_name)
# query again.
indicators = self.indicators.loc[
(self.indicators['creator'] == user_id) & (self.indicators['visible'] == 'True')]
if indicators.empty:
return None
if source:
# Filter indicators by these source parameters.
if 'market' in source:
symbol = source['market']['market']
timeframe = source['market']['timeframe']
exchange = source['market']['exchange']
indicators = indicators[indicators.source.apply(lambda x: x['symbol'] == symbol and
x['timeframe'] == timeframe and
x['exchange_name'] == exchange)]
else:
raise ValueError(f'No implementation for source: {source}')
# Process each indicator and collect the results in a dictionary.
results = {}
for indicator in indicators.itertuples(index=False):
indicator_results = self.process_indicator(indicator=indicator, num_results=num_results)
results[indicator.name] = indicator_results
return results
def delete_indicator(self, indicator_name: str) -> None:
"""
Remove the indicator by name
:param indicator_name: The name of the indicator to remove.
:return: None
"""
if not indicator_name:
raise ValueError("No indicator name provided.")
self.indicators = self.indicators.query("name != @indicator_name").reset_index(drop=True)
self.config.users.save_indicators()
def create_indicator(self, creator: str, name: str, kind: str,
source: dict, properties: dict, visible: bool = True):
"""
Created an instance of the indicator specified in the args.
Initialise it with properties and store its reference in a dataframe.
For convenience, the indicator is indexed by name, type, and source info.
:param creator: The name of the user creating the indicator.
:param name: The name of the indicator being created.
:param kind: The kind of indicator being created.
:param source: The source of data that the indicator analyzes.
:param properties: Properties and values to pass into the initializer.
:param visible: Whether to display it in the chart view.
:return: None
"""
indicator_classes = {
'SMA': SMA,
'EMA': EMA,
'RSI': RSI,
'LREG': LREG,
'ATR': ATR,
'BOLBands': BolBands,
'MACD': MACD,
'Volume': Volume
} # todo define this instead of indicator_types as a global
if kind not in indicator_classes:
raise ValueError(f"[INDICATORS.PY]: Requested an unsupported type of indicator: ({kind})")
indicator_class = indicator_classes[kind]
# Create an instance of the indicator.
indicator = indicator_class(name, kind, properties)
# Add the new indicator to a pandas dataframe.
creator_id = self.config.users.get_id(creator)
row_data = {
'creator': creator_id,
'name': name,
'kind': kind,
'visible': visible,
'source': source,
'properties': properties,
'ref': indicator
}
self.indicators = pd.concat([self.indicators, pd.DataFrame([row_data])], ignore_index=True)
# def update_indicators(self, user_name):
# """
# This does nothing except add a default value.
# :return: None
# """
# return self.get_indicator_data(user_name=user_name, num_results=1)