brighter-trading/src/shared_utilities.py

174 lines
6.3 KiB
Python

from functools import lru_cache
import datetime as dt
from typing import Union
import pandas as pd
import pytz
# Unix epoch in UTC (timezone-aware)
epoch = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
def query_uptodate(records: pd.DataFrame, r_length_min: float) -> Union[float, None]:
"""
Check if records that span a period of time are up-to-date.
:param records: The dataframe holding results from a query.
:param r_length_min: The timespan in minutes of each record in the data.
:return: timestamp - None if records are up-to-date otherwise the newest timestamp on record.
"""
print('\nChecking if the records are up-to-date...')
# Get the newest timestamp from the records passed in stored in ms
last_timestamp = float(records.time.max())
print(f'The last timestamp on record is {last_timestamp}')
# Get a timestamp of the UTC time in milliseconds to match the records in the DB
now_timestamp = unix_time_millis(dt.datetime.now(pytz.UTC))
print(f'The timestamp now is {now_timestamp}')
# Get the seconds since the records have been updated
seconds_since_update = ms_to_seconds(now_timestamp - last_timestamp)
# Convert to minutes
minutes_since_update = seconds_since_update / 60
print(f'The minutes since last update is {minutes_since_update}')
print(f'And the length of each record is {r_length_min}')
# Return the timestamp if the time since last update is more than the timespan each record covers
tolerance_minutes = 10 / 60 # 10 seconds tolerance in minutes
if minutes_since_update > (r_length_min - tolerance_minutes):
# Return the last timestamp in seconds
return last_timestamp
return None
def ms_to_seconds(timestamp: float) -> float:
"""
Converts milliseconds to seconds.
:param timestamp: The timestamp in milliseconds.
:return: The timestamp in seconds.
"""
return timestamp / 1000
def unix_time_seconds(d_time: dt.datetime) -> float:
"""
Converts a datetime object to Unix timestamp in seconds.
:param d_time: The datetime object to convert.
:return: The Unix timestamp in seconds.
"""
if d_time.tzinfo is None:
raise ValueError("d_time is timezone naive. Please provide a timezone-aware datetime.")
return (d_time - epoch).total_seconds()
def unix_time_millis(d_time: dt.datetime) -> float:
"""
Converts a datetime object to Unix timestamp in milliseconds.
:param d_time: The datetime object to convert.
:return: The Unix timestamp in milliseconds.
"""
if d_time.tzinfo is None:
raise ValueError("d_time is timezone naive. Please provide a timezone-aware datetime.")
return (d_time - epoch).total_seconds() * 1000.0
def query_satisfied(start_datetime: dt.datetime, records: pd.DataFrame, r_length_min: float) -> Union[float, None]:
"""
Check if records span back far enough to satisfy a query.
This function determines whether the records provided cover the required start_datetime. It calculates
the total duration covered by the records and checks if this duration, starting from the earliest record,
reaches back to include the start_datetime.
:param start_datetime: The datetime the query starts at.
:param records: The dataframe holding results from a query.
:param r_length_min: The timespan in minutes of each record in the data.
:return: None if the query is satisfied (records span back far enough),
otherwise returns the earliest timestamp in records in seconds.
"""
print('\nChecking if the query is satisfied...')
if start_datetime.tzinfo is None:
raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.")
# Convert start_datetime to Unix timestamp in milliseconds
start_timestamp = unix_time_millis(start_datetime)
print(f'Start timestamp: {start_timestamp}')
if records.empty:
print('No records found. Query cannot be satisfied.')
return float('nan')
# Get the oldest timestamp from the records passed in
first_timestamp = float(records.time.min())
print(f'First timestamp in records: {first_timestamp}')
# Calculate the total duration of the records in milliseconds
total_duration = len(records) * (r_length_min * 60 * 1000)
print(f'Total duration of records: {total_duration}')
# Check if the first timestamp plus the total duration is greater than or equal to the start timestamp
if start_timestamp <= first_timestamp + total_duration:
return None
return first_timestamp
@lru_cache(maxsize=500)
def ts_of_n_minutes_ago(n: int, candle_length: float) -> dt.datetime:
"""
Returns the approximate datetime for the start of a candle that was 'n' candles ago.
:param n: The number of candles ago to calculate.
:param candle_length: The length of each candle in minutes.
:return: The approximate datetime for the start of the 'n'-th candle ago.
"""
# Increment 'n' by 1 to ensure we account for the time that has passed since the last candle closed.
n += 1
# Calculate the total minutes ago the 'n'-th candle started.
minutes_ago = n * candle_length
# Get the current UTC datetime.
now = dt.datetime.now(pytz.UTC)
# Calculate the datetime for 'n' candles ago.
date_of = now - dt.timedelta(minutes=minutes_ago)
# Return the calculated datetime.
return date_of
@lru_cache(maxsize=20)
def timeframe_to_minutes(timeframe: str) -> int:
"""
Converts a string representing a timeframe into an integer representing the approximate minutes.
:param timeframe: Timeframe format is [multiplier:focus]. e.g., '15m', '4h', '1d'
:return: Minutes the timeframe represents, e.g., '2h' -> 120 (minutes).
"""
# Extract the numerical part of the timeframe param.
digits = int("".join([i if i.isdigit() else "" for i in timeframe]))
# Extract the alpha part of the timeframe param.
letter = "".join([i if i.isalpha() else "" for i in timeframe])
if letter == 'm':
pass
elif letter == 'h':
digits *= 60
elif letter == 'd':
digits *= 60 * 24
elif letter == 'w':
digits *= 60 * 24 * 7
elif letter == 'M':
digits *= 60 * 24 * 31 # Maximum number of days in a month
elif letter == 'Y':
digits *= 60 * 24 * 365 # Exact number of days in a year
return digits