brighter-trading/archived_code/DataCache.py

431 lines
19 KiB
Python

from typing import Any, List
import pandas as pd
import datetime as dt
from Database import Database
from shared_utilities import query_satisfied, query_uptodate, unix_time_millis, timeframe_to_minutes
import logging
logger = logging.getLogger(__name__)
class DataCache:
"""
Fetches and manages data limits and optimizes memory storage.
Handles connections and operations for the given exchanges.
Example usage:
--------------
db = DataCache(exchanges=some_exchanges_object)
"""
# Disable during production for improved performance.
TYPECHECKING_ENABLED = True
NO_RECORDS_FOUND = float('nan')
def __init__(self, exchanges):
"""
Initializes the DataCache class.
:param exchanges: The exchanges object handling communication with connected exchanges.
"""
self.max_tables = 50
self.max_records = 1000
self.cached_data = {}
self.db = Database()
self.exchanges = exchanges
def cache_exists(self, key: str) -> bool:
"""
Checks if a data exists for the given key.
:param key: The access key.
:return: True if data exists, False otherwise.
"""
return key in self.cached_data
def update_candle_cache(self, more_records: pd.DataFrame, key: str) -> None:
"""
Adds records to existing data.
:param more_records: The new records to be added.
:param key: The access key.
:return: None.
"""
records = pd.concat([more_records, self.get_cache(key)], axis=0, ignore_index=True)
records = records.drop_duplicates(subset="time", keep='first')
records = records.sort_values(by='time').reset_index(drop=True)
self.set_cache(data=records, key=key)
def set_cache(self, data: Any, key: str, do_not_overwrite: bool = False) -> None:
"""
Creates a new data key and inserts data.
:param data: The records to insert into data.
:param key: The index key for the data.
:param do_not_overwrite: Flag to prevent overwriting existing data.
:return: None
"""
if do_not_overwrite and key in self.cached_data:
return
self.cached_data[key] = data
def update_cached_dict(self, cache_key: str, dict_key: str, data: Any) -> None:
"""
Updates a dictionary stored in data.
:param data: The data to insert into data.
:param cache_key: The data index key for the dictionary.
:param dict_key: The dictionary key for the data.
:return: None
"""
self.cached_data[cache_key].update({dict_key: data})
def get_cache(self, key: str) -> Any:
"""
Returns data indexed by key.
:param key: The index key for the data.
:return: Any|None - The requested data or None on key error.
"""
if key not in self.cached_data:
logger.warning(f"The requested data key({key}) doesn't exist!")
return None
return self.cached_data[key]
def improved_get_records_since(self, key: str, start_datetime: dt.datetime, record_length: int,
ex_details: List[str]) -> pd.DataFrame:
"""
Fetches records since the specified start datetime.
:param key: The data key.
:param start_datetime: The start datetime to fetch records from.
:param record_length: The required number of records.
:param ex_details: Exchange details.
:return: DataFrame containing the records.
"""
try:
target = 'data'
args = {
'key': key,
'start_datetime': start_datetime,
'end_datetime': dt.datetime.utcnow(),
'record_length': record_length,
'ex_details': ex_details
}
df = self.get_or_fetch_from(target=target, **args)
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise
def get_or_fetch_from(self, target: str, **kwargs) -> pd.DataFrame:
key = kwargs.get('key')
start_datetime = kwargs.get('start_datetime')
end_datetime = kwargs.get('start_datetime')
record_length = kwargs.get('record_length')
ex_details = kwargs.get('ex_details')
if self.TYPECHECKING_ENABLED:
# Type checking
if not isinstance(key, str):
raise TypeError("key must be a string")
if not isinstance(start_datetime, dt.datetime):
raise TypeError("start_datetime must be a datetime object")
if not isinstance(end_datetime, dt.datetime):
raise TypeError("end_datetime must be a datetime object")
if not isinstance(record_length, int):
raise TypeError("record_length must be an integer")
if not isinstance(ex_details, list) or not all(isinstance(i, str) for i in ex_details):
raise TypeError("ex_details must be a list of strings")
# Ensure all required arguments are provided
if not all([key, start_datetime, record_length, ex_details]):
raise ValueError("Missing required arguments")
def get_from_cache():
return pd.DataFrame
def get_from_database():
return pd.DataFrame
def get_from_server():
return pd.DataFrame
def data_complete(data, **kwargs) -> bool:
"""Check if a dataframe completely satisfied a request."""
sd = kwargs.get('start_datetime')
ed = kwargs.get('start_datetime')
rl = kwargs.get('record_length')
is_complete = True
return is_complete
request_criteria = {
'start_datetime': start_datetime,
'end_datetime': end_datetime,
'record_length': record_length,
}
if target == 'data':
result = get_from_cache()
if data_complete(result, **request_criteria):
return result
else:
self.get_or_fetch_from('database', **kwargs)
elif target == 'database':
result = get_from_database()
if data_complete(result, **request_criteria):
return result
else:
self.get_or_fetch_from('server', **kwargs)
elif target == 'server':
result = get_from_server()
if data_complete(result, **request_criteria):
return result
else:
logger.error('Unable to fetch the requested data.')
else:
raise ValueError(f'Not a valid target: {target}')
def get_records_since(self, key: str, start_datetime: dt.datetime, record_length: int,
ex_details: List[str]) -> pd.DataFrame:
"""
Fetches records since the specified start datetime.
:param key: The data key.
:param start_datetime: The start datetime to fetch records from.
:param record_length: The required number of records.
:param ex_details: Exchange details.
:return: DataFrame containing the records.
"""
try:
end_datetime = dt.datetime.utcnow()
if self.cache_exists(key=key):
logger.debug('Getting records from data.')
records = self.get_cache(key)
else:
logger.debug(
f'Records not in data. Requesting from DB: starting at: {start_datetime} to: {end_datetime}')
records = self.get_records_since_from_db(table_name=key, st=start_datetime, et=end_datetime,
rl=record_length, ex_details=ex_details)
logger.debug(f'Got {len(records.index)} records from DB.')
self.set_cache(data=records, key=key)
first_timestamp = query_satisfied(start_datetime=start_datetime, records=records,
r_length_min=record_length)
if pd.isna(first_timestamp):
logger.debug('No records found to satisfy the query, continuing to fetch more records.')
additional_records = self.get_records_since_from_db(table_name=key, st=start_datetime, et=end_datetime,
rl=record_length, ex_details=ex_details)
elif first_timestamp is not None:
end_time = dt.datetime.utcfromtimestamp(first_timestamp)
logger.debug(f'Requesting additional records from {start_datetime} to {end_time}')
additional_records = self.get_records_since_from_db(table_name=key, st=start_datetime, et=end_time,
rl=record_length, ex_details=ex_details)
if not additional_records.empty:
logger.debug(f'Got {len(additional_records.index)} additional records from DB.')
self.update_candle_cache(additional_records, key)
last_timestamp = query_uptodate(records=records, r_length_min=record_length)
if last_timestamp is not None:
start_time = dt.datetime.utcfromtimestamp(last_timestamp)
logger.debug(f'Requesting additional records from {start_time} to {end_datetime}')
additional_records = self.get_records_since_from_db(table_name=key, st=start_time, et=end_datetime,
rl=record_length, ex_details=ex_details)
logger.debug(f'Got {len(additional_records.index)} additional records from DB.')
if not additional_records.empty:
self.update_candle_cache(additional_records, key)
_timestamp = unix_time_millis(start_datetime)
logger.debug(f"Start timestamp in human-readable form: {dt.datetime.utcfromtimestamp(_timestamp / 1000.0)}")
result = self.get_cache(key).query('time >= @_timestamp')
return result
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise
def get_records_since_from_db(self, table_name: str, st: dt.datetime, et: dt.datetime, rl: float,
ex_details: List[str]) -> pd.DataFrame:
"""
Fetches records from the database since the specified start datetime.
:param table_name: The name of the table in the database.
:param st: The start datetime to fetch records from.
:param et: The end datetime to fetch records until.
:param rl: The required number of records.
:param ex_details: Exchange details.
:return: DataFrame containing the records.
"""
def add_data(data: pd.DataFrame, tn: str, start_t: dt.datetime, end_t: dt.datetime) -> pd.DataFrame:
new_records = self._populate_db(table_name=tn, start_time=start_t, end_time=end_t, ex_details=ex_details)
logger.debug(f'Got {len(new_records.index)} records from exchange_name')
if not new_records.empty:
data = pd.concat([data, new_records], axis=0, ignore_index=True)
data = data.drop_duplicates(subset="time", keep='first')
return data
if self.db.table_exists(table_name=table_name):
logger.debug('Table existed retrieving records from DB')
logger.debug(f'Requesting from {st} to {et}')
records = self.db.get_timestamped_records(table_name=table_name, timestamp_field='time', st=st, et=et)
logger.debug(f'Got {len(records.index)} records from db')
else:
logger.debug(f"Table didn't exist fetching from {ex_details[2]}")
temp = (((unix_time_millis(et) - unix_time_millis(st)) / 1000) / 60) / rl
logger.debug(f'Requesting from {st} to {et}, Should be {temp} records')
records = self._populate_db(table_name=table_name, start_time=st, end_time=et, ex_details=ex_details)
logger.debug(f'Got {len(records.index)} records from {ex_details[2]}')
first_timestamp = query_satisfied(start_datetime=st, records=records, r_length_min=rl)
if pd.isna(first_timestamp):
logger.debug('No records found to satisfy the query, continuing to fetch more records.')
records = add_data(data=records, tn=table_name, start_t=st, end_t=et)
elif first_timestamp:
logger.debug(f'Records did not go far enough back. Requesting from {ex_details[2]}')
logger.debug(f'First ts on record is: {first_timestamp}')
end_time = dt.datetime.utcfromtimestamp(first_timestamp)
logger.debug(f'Requesting from {st} to {end_time}')
records = add_data(data=records, tn=table_name, start_t=st, end_t=end_time)
last_timestamp = query_uptodate(records=records, r_length_min=rl)
if last_timestamp:
logger.debug(f'Records were not updated. Requesting from {ex_details[2]}.')
logger.debug(f'The last record on file is: {last_timestamp}')
start_time = dt.datetime.utcfromtimestamp(last_timestamp)
logger.debug(f'Requesting from {start_time} to {et}')
records = add_data(data=records, tn=table_name, start_t=start_time, end_t=et)
return records
def _populate_db(self, table_name: str, start_time: dt.datetime, ex_details: List[str],
end_time: dt.datetime = None) -> pd.DataFrame:
"""
Populates a database table with records from the exchange.
:param table_name: Name of the table in the database.
:param start_time: Start time to fetch the records from.
:param end_time: End time to fetch the records until (optional).
:param ex_details: Exchange details [symbol, interval, exchange_name, user_name].
:return: DataFrame of the data downloaded.
"""
if end_time is None:
end_time = dt.datetime.utcnow()
sym, inter, ex, un = ex_details
records = self._fetch_candles_from_exchange(symbol=sym, interval=inter, exchange_name=ex, user_name=un,
start_datetime=start_time, end_datetime=end_time)
if not records.empty:
self.db.insert_candles_into_db(records, table_name=table_name, symbol=sym, exchange_name=ex)
else:
logger.debug(f'No records inserted {records}')
return records
def _fetch_candles_from_exchange(self, symbol: str, interval: str, exchange_name: str, user_name: str,
start_datetime: dt.datetime = None,
end_datetime: dt.datetime = None) -> pd.DataFrame:
"""
Fetches and returns all candles from the specified market, timeframe, and exchange.
:param symbol: Symbol of the market.
:param interval: Timeframe in the format '<int><alpha>' (e.g., '15m', '4h').
:param exchange_name: Name of the exchange.
:param user_name: Name of the user.
:param start_datetime: Start datetime for fetching data (optional).
:param end_datetime: End datetime for fetching data (optional).
:return: DataFrame of candle data.
"""
def fill_data_holes(records: pd.DataFrame, interval: str) -> pd.DataFrame:
"""
Fills gaps in the data by replicating the last known data point for the missing periods.
:param records: DataFrame containing the original records.
:param interval: Interval of the data (e.g., '1m', '5m').
:return: DataFrame with gaps filled.
"""
time_span = timeframe_to_minutes(interval)
last_timestamp = None
filled_records = []
logger.info(f"Starting to fill data holes for interval: {interval}")
for index, row in records.iterrows():
time_stamp = row['time']
if last_timestamp is None:
last_timestamp = time_stamp
filled_records.append(row)
logger.debug(f"First timestamp: {time_stamp}")
continue
delta_ms = time_stamp - last_timestamp
delta_minutes = (delta_ms / 1000) / 60
logger.debug(f"Timestamp: {time_stamp}, Delta minutes: {delta_minutes}")
if delta_minutes > time_span:
num_missing_rec = int(delta_minutes / time_span)
step = int(delta_ms / num_missing_rec)
logger.debug(f"Gap detected. Filling {num_missing_rec} records with step: {step}")
for ts in range(int(last_timestamp) + step, int(time_stamp), step):
new_row = row.copy()
new_row['time'] = ts
filled_records.append(new_row)
logger.debug(f"Filled timestamp: {ts}")
filled_records.append(row)
last_timestamp = time_stamp
logger.info("Data holes filled successfully.")
return pd.DataFrame(filled_records)
if start_datetime is None:
start_datetime = dt.datetime(year=2017, month=1, day=1)
if end_datetime is None:
end_datetime = dt.datetime.utcnow()
if start_datetime > end_datetime:
raise ValueError("Invalid start and end parameters: start_datetime must be before end_datetime.")
exchange = self.exchanges.get_exchange(ename=exchange_name, uname=user_name)
expected_records = (((unix_time_millis(end_datetime) - unix_time_millis(
start_datetime)) / 1000) / 60) / timeframe_to_minutes(interval)
logger.info(
f'Fetching historical data from {start_datetime} to {end_datetime}. Expected records: {expected_records}')
if start_datetime == end_datetime:
end_datetime = None
candles = exchange.get_historical_klines(symbol=symbol, interval=interval, start_dt=start_datetime,
end_dt=end_datetime)
num_rec_records = len(candles.index)
logger.info(f'{num_rec_records} candles retrieved from the exchange.')
open_times = candles.time
min_open_time = open_times.min()
max_open_time = open_times.max()
if min_open_time < 1e10:
raise ValueError('Records are not in milliseconds')
max_open_time /= 1000
min_open_time /= 1000
estimated_num_records = ((max_open_time - min_open_time) / 60) / timeframe_to_minutes(interval)
logger.info(f'Estimated number of records: {estimated_num_records}')
if num_rec_records < estimated_num_records:
logger.info('Detected gaps in the data, attempting to fill missing records.')
candles = fill_data_holes(candles, interval)
return candles