Made a lot of changes to DataCache indicator data is not being saved to the database.

This commit is contained in:
Rob 2024-09-15 14:05:08 -03:00
parent f2b7621b6d
commit 1ff21b56dd
10 changed files with 1911 additions and 1150 deletions

View File

@ -14,11 +14,14 @@ from trade import Trades
class BrighterTrades:
def __init__(self):
# Object that interacts and maintains exchange_interface and account data
self.exchanges = ExchangeInterface()
# Object that interacts with the persistent data.
self.data = DataCache(self.exchanges)
self.data = DataCache()
# Object that interacts and maintains exchange_interface and account data
self.exchanges = ExchangeInterface(self.data)
# Set the exchange for datacache to use
self.data.set_exchange(self.exchanges)
# Configuration for the app
self.config = Configuration()
@ -34,7 +37,7 @@ class BrighterTrades:
config=self.config)
# Object that interacts with and maintains data from available indicators
self.indicators = Indicators(self.candles, self.users)
self.indicators = Indicators(self.candles, self.users, self.data)
# Object that maintains the trades data
self.trades = Trades(self.users)
@ -186,8 +189,8 @@ class BrighterTrades:
:return: bool - True on success.
"""
active_exchanges = self.users.get_exchanges(user_name, category='active_exchanges')
success = False
success = False
for exchange in active_exchanges:
keys = self.users.get_api_keys(user_name, exchange)
result = self.connect_or_config_exchange(user_name=user_name,
@ -391,7 +394,8 @@ class BrighterTrades:
}
try:
if self.exchanges.exchange_data.query("user == @user_name and name == @exchange_name").empty:
if self.data.get_cache_item().get_cache('exchange_data').query([('user', user_name),
('name', exchange_name)]).empty:
# Exchange is not connected, try to connect
success = self.exchanges.connect_exchange(exchange_name=exchange_name, user_name=user_name,
api_keys=api_keys)

View File

@ -180,7 +180,7 @@ class DataCache:
params.append(additional_filter[1])
# Execute the SQL query to remove the row from the database
self.db.execute_sql(sql, tuple(params))
self.db.execute_sql(sql, params)
logger.info(
f"Row removed from database: table={table}, filter={filter_vals},"
f" additional_filter={additional_filter}")

File diff suppressed because it is too large Load Diff

View File

@ -85,7 +85,7 @@ class Database:
def __init__(self, db_file: str = None):
self.db_file = db_file
def execute_sql(self, sql: str, params: tuple = ()) -> None:
def execute_sql(self, sql: str, params: list = None) -> None:
"""
Executes a raw SQL statement with optional parameters.
@ -115,22 +115,28 @@ class Database:
error = f"Couldn't fetch item {item_name} from {table_name} where {filter_vals[0]} = {filter_vals[1]}"
raise ValueError(error)
def get_rows_where(self, table: str, filter_vals: Tuple[str, Any]) -> pd.DataFrame | None:
def get_rows_where(self, table: str, filter_vals: List[Tuple[str, Any]]) -> pd.DataFrame | None:
"""
Returns a DataFrame containing all rows of a table that meet the filter criteria.
:param table: Name of the table.
:param filter_vals: Tuple of column name and value to filter by.
:param filter_vals: List of tuples containing column names and values to filter by.
:return: DataFrame of the query result or None if empty or column does not exist.
"""
try:
with SQLite(self.db_file) as con:
qry = f"SELECT * FROM {table} WHERE {filter_vals[0]} = ?"
result = pd.read_sql(qry, con, params=(filter_vals[1],))
# Construct the WHERE clause with multiple conditions
where_clause = " AND ".join([f"{col} = ?" for col, _ in filter_vals])
params = [val for _, val in filter_vals]
# Prepare and execute the query with the constructed WHERE clause
qry = f"SELECT * FROM {table} WHERE {where_clause}"
result = pd.read_sql(qry, con, params=params)
return result if not result.empty else None
except (sqlite3.OperationalError, pd.errors.DatabaseError) as e:
# Log the error or handle it appropriately
print(f"Error querying table '{table}' for column '{filter_vals[0]}': {e}")
print(f"Error querying table '{table}' with filters {filter_vals}: {e}")
return None
def insert_dataframe(self, df: pd.DataFrame, table: str) -> int:

View File

@ -1,8 +1,10 @@
import logging
from typing import List, Any, Dict
from typing import List, Any, Dict, TYPE_CHECKING
import pandas as pd
import ccxt
from Exchange import Exchange
from DataCache_v3 import DataCache
logger = logging.getLogger(__name__)
@ -17,23 +19,38 @@ class ExchangeInterface:
Connects, maintains, and routes data requests to/from multiple exchanges.
"""
def __init__(self):
self.exchange_data = pd.DataFrame(columns=['user', 'name', 'reference', 'balances'])
def __init__(self, cache_manager: DataCache):
self.cache_manager = cache_manager
self.cache_manager.create_cache(
name='exchange_data',
cache_type='table',
size_limit=100,
eviction_policy='deny',
columns=['user', 'name', 'reference', 'balances']
)
self.available_exchanges = self.get_ccxt_exchanges()
# Create a default user and exchange for unsigned requests
default_ex_name = 'binance'
self.connect_exchange(exchange_name=default_ex_name, user_name='default')
self.default_exchange = self.get_exchange(ename=default_ex_name, uname='default')
self.default_ex_name = 'binance'
self.default_exchange = None
def get_ccxt_exchanges(self) -> List[str]:
def connect_default_exchange(self):
if self.default_exchange is not None:
return
# Create a default user and exchange for unsigned requests
self.connect_exchange(exchange_name=self.default_ex_name, user_name='default')
self.default_exchange = self.get_exchange(ename=self.default_ex_name, uname='default')
@staticmethod
def get_ccxt_exchanges() -> List[str]:
"""Retrieve the list of available exchanges from CCXT."""
return ccxt.exchanges
def get_public_exchanges(self) -> List[str]:
@staticmethod
def get_public_exchanges() -> List[str]:
"""Return a list of public exchanges available from CCXT."""
public_list = []
file_path = 'src\working_public_exchanges.txt'
file_path = r"src\working_public_exchanges.txt"
try:
with open(file_path, 'r') as file:
@ -70,8 +87,12 @@ class ExchangeInterface:
:param exchange: The Exchange object to add.
"""
try:
row = {'user': user_name, 'name': exchange.name, 'reference': exchange, 'balances': exchange.balances}
self.exchange_data = add_row(self.exchange_data, row)
row = pd.DataFrame([{
'user': user_name, 'name': exchange.name,
'reference': exchange, 'balances': exchange.balances}])
cache = self.cache_manager.get_cache('exchange_data')
cache.add_table(df=row)
except Exception as e:
logger.error(f"Couldn't create an instance of the exchange! {str(e)}")
raise
@ -87,7 +108,9 @@ class ExchangeInterface:
if not ename or not uname:
raise ValueError('Missing argument!')
exchange_data = self.exchange_data.query("name == @ename and user == @uname")
cache = self.cache_manager.get_cache('exchange_data')
exchange_data = cache.query([('name', ename), ('user', uname)])
if exchange_data.empty:
raise ValueError('No matching exchange found.')
@ -100,7 +123,9 @@ class ExchangeInterface:
:param user_name: The name of the user.
:return: A list of connected exchange names.
"""
return self.exchange_data.loc[self.exchange_data['user'] == user_name, 'name'].tolist()
cache = self.cache_manager.get_cache('exchange_data')
exchanges = cache.query([('user', user_name)])
return exchanges['name'].tolist()
def get_available_exchanges(self) -> List[str]:
"""Get a list of available exchanges."""
@ -114,9 +139,10 @@ class ExchangeInterface:
:param name: The name of the exchange.
:return: A Series containing the balances.
"""
filtered_data = self.exchange_data.query("user == @user_name and name == @name")
if not filtered_data.empty:
return filtered_data.iloc[0]['balances']
cache = self.cache_manager.get_cache('exchange_data')
exchange = cache.query([('user', user_name), ('name', name)])
if not exchange.empty:
return exchange.iloc[0]['balances']
else:
return pd.Series(dtype='object') # Return an empty Series if no match is found
@ -127,12 +153,15 @@ class ExchangeInterface:
:param user_name: The name of the user.
:return: A dictionary containing the balances of all connected exchanges.
"""
filtered_data = self.exchange_data.loc[self.exchange_data['user'] == user_name, ['name', 'balances']]
if filtered_data.empty:
return {}
# Query exchange data for the given user
cache = self.cache_manager.get_cache('exchange_data')
exchanges = cache.query([('user', user_name)])
balances_dict = {row['name']: row['balances'] for _, row in filtered_data.iterrows()}
return balances_dict
# Select 'name' and 'balances' columns for all rows
filtered_data = exchanges.loc[:, ['name', 'balances']]
# Return a dictionary where exchange 'name' is the key and 'balances' is the value
return {row['name']: row['balances'] for _, row in filtered_data.iterrows()}
def get_all_activated(self, user_name: str, fetch_type: str = 'trades') -> Dict[str, List[Dict[str, Any]]]:
"""
@ -142,16 +171,24 @@ class ExchangeInterface:
:param fetch_type: The type of data to fetch ('trades' or 'orders').
:return: A dictionary indexed by exchange name with lists of active trades or open orders.
"""
filtered_data = self.exchange_data.loc[self.exchange_data['user'] == user_name, ['name', 'reference']]
cache = self.cache_manager.get_cache('exchange_data')
exchanges = cache.query([('user', user_name)])
# Select the 'name' and 'reference' columns
filtered_data = exchanges.loc[:, ['name', 'reference']]
if filtered_data.empty:
return {}
data_dict = {}
# Iterate over the filtered data
for name, reference in filtered_data.itertuples(index=False):
if pd.isna(reference):
continue
try:
# Fetch active trades or open orders based on the fetch_type
if fetch_type == 'trades':
data = reference.get_active_trades()
elif fetch_type == 'orders':
@ -222,6 +259,7 @@ class ExchangeInterface:
:return: The current price.
"""
if price_source is None:
self.connect_default_exchange()
return self.default_exchange.get_price(symbol=symbol)
else:
raise ValueError(f'No implementation for price source: {price_source}')

View File

@ -2,9 +2,9 @@ import copy
import datetime as dt
import json
import random
from typing import Any
from passlib.hash import bcrypt
import pandas as pd
from typing import Any
from DataCache_v3 import DataCache
@ -21,6 +21,16 @@ class BaseUser:
:param data_cache: Object responsible for managing cached data and database interaction.
"""
self.data = data_cache
# Create a table-based cache with specified columns
self.data.create_cache(name='users',
cache_type='table',
size_limit=100,
eviction_policy='deny',
default_expiration=dt.timedelta(hours=24),
columns=["id", "user_name", "status", "chart_views", "email",
"active_exchanges", "configured_exchanges", "password",
"api_keys", "signin_time", "active_indicators"]
)
def get_id(self, user_name: str) -> int:
"""
@ -29,7 +39,7 @@ class BaseUser:
:param user_name: The name of the user.
:return: The ID of the user as an integer.
"""
return self.data.fetch_item(
return self.data.fetch_datacache_item(
item_name='id',
cache_name='users',
filter_vals=('user_name', user_name)
@ -42,7 +52,7 @@ class BaseUser:
:param id: The id of the user.
:return: The name of the user as a str.
"""
return self.data.fetch_item(
return self.data.fetch_datacache_item(
item_name='user_name',
cache_name='users',
filter_vals=('id', id)
@ -55,10 +65,9 @@ class BaseUser:
:param user_name: The name of the user to remove from the cache.
"""
# Remove the user from the cache only
self.data.remove_row(
cache_name='users',
filter_vals=('user_name', user_name), remove_from_db=False
)
self.data.remove_row_from_datacache(cache_name='users',
filter_vals=[('user_name', user_name)],
remove_from_db=False)
def delete_user(self, user_name: str) -> None:
"""
@ -66,10 +75,8 @@ class BaseUser:
:param user_name: The name of the user to delete.
"""
self.data.remove_row(
filter_vals=('user_name', user_name),
cache_name='users'
)
self.data.remove_row_from_datacache(filter_vals=[('user_name', user_name)],
cache_name='users')
def get_user_data(self, user_name: str) -> pd.DataFrame | None:
"""
@ -81,10 +88,8 @@ class BaseUser:
:raises ValueError: If the user is not found in both the cache and the database.
"""
# Attempt to fetch the user data from the cache or database via DataCache
user = self.data.get_or_fetch_rows(
cache_name='users',
filter_vals=('user_name', user_name)
)
user = self.data.get_rows_from_datacache(
cache_name='users', filter_vals=[('user_name', user_name)])
if user is None or user.empty:
raise ValueError(f"User '{user_name}' not found in database or cache!")
@ -100,9 +105,9 @@ class BaseUser:
:param new_data: The new data to be set.
"""
# Use DataCache to modify the user's data
self.data.modify_item(
self.data.modify_datacache_item(
cache_name='users',
filter_vals=('user_name', username),
filter_vals=[('user_name', username)],
field_name=field_name,
new_data=new_data
)
@ -154,7 +159,7 @@ class UserAccountManagement(BaseUser):
:return: True if the password is correct, False otherwise.
"""
# Retrieve the hashed password using DataCache
user_data = self.data.get_or_fetch_rows(cache_name='users', filter_vals=('user_name', username))
user_data = self.data.get_rows_from_datacache(cache_name='users', filter_vals=[('user_name', username)])
if user_data is None or user_data.empty:
return False
@ -238,13 +243,13 @@ class UserAccountManagement(BaseUser):
def user_attr_is_taken(self, attr: str, val: str) -> bool:
"""
Checks if a specific user attribute (e.g., username, email) is already taken.
:param attr: The attribute to check (e.g., 'user_name', 'email').
:param val: The value of the attribute to check.
:return: True if the attribute is already taken, False otherwise.
"""
# Use DataCache to check if the attribute is taken
return self.data.is_attr_taken(cache_name='users', attr=attr, val=val)
user_cache = self.data.get_rows_from_datacache('users', [(attr, val)])
return True if not user_cache.empty else False
def create_unique_guest_name(self) -> str | None:
"""
@ -262,7 +267,7 @@ class UserAccountManagement(BaseUser):
username = f'guest_{suffix}'
# Check if the username already exists in the database
if not self.data.get_or_fetch_rows(cache_name='users', filter_vals=('user_name', username)):
if not self.data.get_rows_from_datacache(cache_name='users', filter_vals=[('user_name', username)]):
return username
attempts += 1
@ -298,7 +303,7 @@ class UserAccountManagement(BaseUser):
raise ValueError("Attributes must be a tuple of single key-value pair dictionaries.")
# Retrieve the default user template from the database using DataCache
default_user = self.data.get_or_fetch_rows(cache_name='users', filter_vals=('user_name', 'guest'))
default_user = self.data.get_rows_from_datacache(cache_name='users', filter_vals=[('user_name', 'guest')])
if default_user is None or default_user.empty:
raise ValueError("Default user template not found in the database.")
@ -314,8 +319,10 @@ class UserAccountManagement(BaseUser):
# Remove the 'id' column before inserting into the database
new_user = new_user.drop(columns='id')
# Insert the modified user data into the database, skipping cache insertion
self.data.insert_df(df=new_user, cache_name="users", skip_cache=True)
# Insert the modified user as a single row, skipping cache
columns = tuple(new_user.columns)
values = tuple(new_user.iloc[0])
self.data.insert_row_into_datacache(cache_name="users", columns=columns, values=values, skip_cache=True)
def create_new_user(self, username: str, email: str, password: str) -> bool:
"""
@ -464,7 +471,7 @@ class UserIndicatorManagement(UserExchangeManagement):
user_id = int(self.get_id(user_name))
# Fetch the indicators from the database using DataCache
df = self.data.get_or_fetch_rows(cache_name='indicators', filter_vals=('creator', user_id))
df = self.data.get_rows_from_datacache(cache_name='indicators', filter_vals=[('creator', user_id)])
# If indicators are found, process the JSON fields
if df is not None and not df.empty:
@ -492,25 +499,11 @@ class UserIndicatorManagement(UserExchangeManagement):
columns = ('creator', 'name', 'visible', 'kind', 'source', 'properties')
# Insert the row into the database and cache using DataCache
self.data.insert_row(cache_name='indicators', columns=columns, values=values)
self.data.insert_row_into_datacache(cache_name='indicators', columns=columns, values=values)
except Exception as e:
print(f"Error saving indicator {indicator['name']} for creator {indicator['creator']}: {str(e)}")
def remove_indicator(self, indicator_name: str, user_name: str) -> None:
"""
Removes a specific indicator from the database and cache.
:param indicator_name: The name of the indicator to remove.
:param user_name: The name of the user who created the indicator.
"""
user_id = int(self.get_id(user_name))
self.data.remove_row(
filter_vals=('name', indicator_name),
additional_filter=('creator', user_id),
cache_name='indicators'
)
def get_chart_view(self, user_name: str, prop: str | None = None):
"""
Fetches the chart view or one specific property of it for a specific user.

View File

@ -1,10 +1,10 @@
import json
import random
from typing import Any, Optional, Dict
import numpy as np
import pandas as pd
import talib
import datetime as dt
# A dictionary to hold both indicator types and their corresponding classes.
indicators_registry = {}
@ -255,16 +255,35 @@ indicators_registry['MACD'] = MACD
class Indicators:
def __init__(self, candles, users):
def __init__(self, candles, users, cache_manager):
# Object manages and serves price and candle data.
self.candles = candles
# A connection to an object that handles user data.
self.users = users
# Collection of instantiated indicators objects
self.indicators = pd.DataFrame(columns=['creator', 'name', 'visible',
'kind', 'source', 'properties', 'ref'])
# A connection to an object that handles all data.
self.cache_manager = cache_manager
# Cache for storing instantiated indicator objects
cache_manager.create_cache(
name='indicators',
cache_type='table',
size_limit=100,
eviction_policy='deny',
default_expiration=dt.timedelta(days=1),
columns=['creator', 'name', 'visible', 'kind', 'source', 'properties', 'ref']
)
# Cache for storing calculated indicator data
cache_manager.create_cache('indicator_data', cache_type='row', size_limit=100,
default_expiration=dt.timedelta(days=7), eviction_policy='evict')
# Cache for storing display properties indicators
cache_manager.create_cache('user_display_properties', cache_type='row',
size_limit=100,
default_expiration=dt.timedelta(days=1),
eviction_policy='evict')
# Available indicator types and classes from a global indicators_registry.
self.indicator_registry = indicators_registry
@ -341,27 +360,34 @@ class Indicators:
:return: dict - A dictionary of indicator names as keys and their attributes as values.
"""
user_id = self.users.get_id(username)
if not user_id:
raise ValueError(f"Invalid user_name: {username}")
# Fetch indicators based on visibility status
if only_enabled:
indicators_df = self.indicators.query("creator == @user_id and visible == 1")
indicators_df = self.cache_manager.get_rows_from_datacache('indicators', [('creator', user_id), ('visible', 1)])
else:
indicators_df = self.indicators.query('creator == @user_id')
indicators_df = self.cache_manager.get_rows_from_datacache('indicators', [('creator', user_id)])
if indicators_df.empty:
# Attempt to load from storage.
self.load_indicators(user_name=username)
indicators_df = self.indicators.query('creator == @user_id')
# Check if the DataFrame is empty
if indicators_df is None or indicators_df.empty:
return {} # Return an empty dictionary if no indicators are found
# Create the dictionary
result = {}
# Iterate over the rows and construct the result dictionary
for _, row in indicators_df.iterrows():
# Include all properties from the properties dictionary, not just a limited subset.
# Ensure that row['properties'] is a dictionary
properties = row.get('properties', {})
if not isinstance(properties, dict):
properties = {}
# Construct the result dictionary for each indicator
result[row['name']] = {
'type': row['kind'],
'visible': row['visible'],
**row['properties'] # This will include all properties in the dictionary
**properties # Merge in all properties from the properties field
}
return result
@ -374,21 +400,21 @@ class Indicators:
:param indicator_names: List of indicator names to set as visible.
:return: None
"""
indicators = self.cache_manager.get_rows_from_datacache('indicators', [('creator', user_id)])
# Validate inputs
if user_id not in self.indicators['creator'].unique():
# raise ValueError(f"Invalid user_name: {user_name}")
# Nothing may be loaded.
if indicators.empty:
return
# Set visibility for all indicators of the user
self.indicators.loc[self.indicators['creator'] == user_id, 'visible'] = 0
# Set visibility for the specified indicator names
self.indicators.loc[self.indicators['name'].isin(indicator_names), 'visible'] = 1
# Set visibility for all indicators off
self.cache_manager.modify_datacache_item('indicators', [('creator', user_id)], field_name='visible', new_data=0)
# Set visibility for the specified indicators on
self.cache_manager.modify_datacache_item('indicators', [('creator', user_id), ('name', indicator_names)],
field_name='visible', new_data=1)
def edit_indicator(self, user_name: str, params: dict):
"""
Edits an existing indicator's properties.
:param user_name: The name of the user.
:param params: The updated properties of the indicator.
"""
@ -398,33 +424,15 @@ class Indicators:
# Get the indicator from the user's indicator list
user_id = self.users.get_id(user_name)
indicator_row = self.indicators.query('name == @indicator_name and creator == @user_id')
indicator = self.cache_manager.get_rows_from_datacache('indicators', [('name', indicator_name), ('creator', user_id)])
if indicator_row.empty:
if indicator.empty:
raise ValueError(f"Indicator '{indicator_name}' not found for user '{user_name}'.")
# Update the top-level fields
top_level_keys = ['name', 'visible', 'kind'] # Top-level keys, expand this if needed
for key, value in params.items():
if key in top_level_keys and key in indicator_row.columns:
self.indicators.at[indicator_row.index[0], key] = value
# Update 'source' dictionary fields
if 'source' in indicator_row.columns and isinstance(indicator_row['source'].iloc[0], dict):
source_dict = indicator_row['source'].iloc[0] # Direct reference, no need for reassignment later
for key, value in params.items():
if key in source_dict:
source_dict[key] = value
# Update 'properties' dictionary fields
if 'properties' in indicator_row.columns and isinstance(indicator_row['properties'].iloc[0], dict):
properties_dict = indicator_row['properties'].iloc[0] # No copy, modify directly
for key, value in params.items():
if key in properties_dict:
properties_dict[key] = value
# Save the updated indicator for the user in the database.
self.users.save_indicators(indicator_row)
# Modify indicator.
self.cache_manager.modify_datacache_item('indicators',
[('creator', params.get('user_name')), ('name', params.get('name'))],
field_name=params.get('setting'), new_data=params.get('value'))
def new_indicator(self, user_name: str, params) -> None:
"""
@ -457,86 +465,93 @@ class Indicators:
# Create indicator.
self.create_indicator(creator=user_name, name=indcr, kind=indtyp, source=source, properties=properties)
# Update the watch-list in config.
self.save_indicator(self.indicators.loc[self.indicators.name == indcr])
def process_indicator(self, indicator, num_results: int = 1) -> pd.DataFrame | None:
"""
Trigger execution of the indicator's analysis against an updated source.
Trigger execution of the indicator's analysis against an updated source.
:param indicator: A named tuple containing indicator data.
:param num_results: The number of results being requested.
:return: The results of the indicator analysis as a DataFrame.
:param indicator: A named tuple or dict containing indicator data.
:param num_results: The number of results being requested.
:return: The results of the indicator analysis as a DataFrame.
"""
username = self.users.get_username(indicator.creator)
src = indicator.source
symbol, timeframe, exchange_name = src['symbol'], src['timeframe'], src['exchange_name']
# Retrieve necessary details to instantiate the indicator
name = indicator.name
kind = indicator.kind
properties = json.loads(indicator.properties)
# Adjust num_results to account for the lookup period if specified in the indicator properties.
if 'period' in indicator.ref.properties:
num_results += indicator.ref.properties['period']
if 'period' in properties:
num_results += properties['period']
# Request the data from the defined source.
data = self.candles.get_last_n_candles(num_candles=num_results,
asset=symbol, timeframe=timeframe,
exchange=exchange_name, user_name=username)
# Calculate the indicator using the retrieved data.
return indicator.ref.calculate(candles=data, user_name=username, num_results=num_results)
# Instantiate the indicator object based on the kind
indicator_class = self.indicator_registry[kind]
indicator_obj = indicator_class(name=name, indicator_type=kind, properties=properties)
# Run the calculate method of the indicator
return indicator_obj.calculate(candles=data, user_name=username, num_results=num_results)
def get_indicator_data(self, user_name: str, source: dict = None,
visible_only: bool = True, start_ts: float = None,
num_results: int = 1000) -> Optional[Dict[str, Any]]:
"""
Loop through enabled indicators in a user's watch-list. Run the appropriate
update function and return a dictionary containing all the results.
Loop through enabled indicators in a user's watch-list. Run the appropriate
update function and return a dictionary containing all the results.
:param user_name: The name of the user making the request.
:param source: Pass in a source definition to return only results against a particular source.
:param visible_only: Returns only results marked visible.
:param start_ts: The timestamp to begin the analysis at. (Not implemented yet)
:param num_results: The number of results requested.
:return: A dictionary of timestamped data returned from each indicator indexed by the indicator's name,
or None if no indicators matched the query.
:param user_name: The name of the user making the request.
:param source: Pass in a source definition to return only results against a particular source.
:param visible_only: Returns only results marked visible.
:param start_ts: The timestamp to begin the analysis at. (Not implemented yet)
:param num_results: The number of results requested.
:return: A dictionary of timestamped data returned from each indicator indexed by the indicator's name,
or None if no indicators matched the query.
"""
if start_ts:
print("Warning: start_ts has not implemented in get_indicator_data()!")
print("Warning: start_ts has not been implemented in get_indicator_data()!")
user_id = self.users.get_id(user_name=user_name)
# Construct the query based on user_name and visibility.
query = f"creator == {user_id}"
if visible_only:
query += " and visible == 1"
visible = 1 if visible_only else 0
# Filter the indicators based on the query.
indicators = self.indicators.loc[
(self.indicators['creator'] == user_id) & (self.indicators['visible'] == 1)]
indicators = self.cache_manager.get_rows_from_datacache('indicators', [('creator', user_id), ('visible', visible)])
# Return None if no indicators matched the query.
if indicators.empty:
# Attempt to re-load from db
self.load_indicators(user_name=user_name)
# query again.
indicators = self.indicators.loc[
(self.indicators['creator'] == user_id) & (self.indicators['visible'] == 1)]
if indicators.empty:
return None
return None
if source:
# Filter indicators by these source parameters.
if 'market' in source:
symbol = source['market']['market']
timeframe = source['market']['timeframe']
exchange = source['market']['exchange']
indicators = indicators[indicators.source.apply(lambda x: x['symbol'] == symbol and
x['timeframe'] == timeframe and
x['exchange_name'] == exchange)]
else:
raise ValueError(f'No implementation for source: {source}')
# Convert 'source' column to dictionaries if they are strings
indicators['source'] = indicators['source'].apply(lambda x: json.loads(x) if isinstance(x, str) else x)
# Extract relevant fields from the source's market
source_timeframe = source.get('market', {}).get('timeframe')
source_exchange = source.get('market', {}).get('exchange')
source_symbol = source.get('market', {}).get('market')
# Extract fields from indicators['source'] and compare directly
mask = (indicators['source'].apply(lambda s: s.get('timeframe')) == source_timeframe) & \
(indicators['source'].apply(lambda s: s.get('exchange_name')) == source_exchange) & \
(indicators['source'].apply(lambda s: s.get('symbol')) == source_symbol)
# Filter the DataFrame using the mask
filtered_indicators = indicators[mask]
# If no indicators match the filtered source, return None.
if indicators.empty:
return None
# Process each indicator, convert DataFrame to JSON-serializable format, and collect the results
json_ready_results = {}
for indicator in indicators.itertuples(index=False):
indicator_results = self.process_indicator(indicator=indicator, num_results=num_results)
@ -561,12 +576,8 @@ class Indicators:
# Get the user ID to filter the indicators belonging to the user
user_id = self.users.get_id(user_name)
# Remove the indicator from the DataFrame where the name matches and the creator is the user
self.indicators = self.indicators[
~((self.indicators['name'] == indicator_name) & (self.indicators['creator'] == user_id))
]
self.users.remove_indicator(indicator_name=indicator_name, user_name=user_name)
identifying_values = [('name', indicator_name), ('creator', user_id)]
self.cache_manager.remove_row_from_datacache(cache_name='indicators', filter_vals=identifying_values)
def create_indicator(self, creator: str, name: str, kind: str,
source: dict, properties: dict, visible: bool = True):
@ -583,36 +594,29 @@ class Indicators:
:param visible: Whether to display it in the chart view.
:return: None
"""
# Todo: Possible refactor to save without storing the indicator instance
self.indicators = self.indicators.reset_index(drop=True)
creator_id = self.users.get_id(creator)
# Check if an indicator with the same name already exists
existing_indicator = self.indicators.query('name == @name and creator == @creator_id')
indicators = self.cache_manager.get_rows_from_datacache('indicators', [('name', name), ('creator', creator_id)])
if not existing_indicator.empty:
if not indicators.empty:
print(f"Indicator '{name}' already exists for user '{creator}'. Skipping creation.")
return # Exit the method to prevent duplicate creation
if kind not in self.indicator_registry:
raise ValueError(f"Requested an unsupported type of indicator: ({kind})")
indicator_class = self.indicator_registry[kind]
# Create an instance of the indicator.
indicator = indicator_class(name, kind, properties)
# Add the new indicator to a pandas dataframe.
creator_id = self.users.get_id(creator)
row_data = {
row_data = pd.DataFrame([{
'creator': creator_id,
'name': name,
'kind': kind,
'visible': visible,
'source': source,
'properties': properties,
'ref': indicator
}
self.indicators = pd.concat([self.indicators, pd.DataFrame([row_data])], ignore_index=True)
'properties': properties
}])
self.cache_manager.insert_df_into_datacache(df=row_data, cache_name="users", skip_cache=False)
# def update_indicators(self, user_name):
# """

File diff suppressed because it is too large Load Diff