From 439c852cf5331e2f1dbf63d4ab8bdfccaf44a239 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 16 Aug 2024 15:51:15 -0300 Subject: [PATCH] Fixed an issue where the client asked for too many records and was not receiving them. Also ensured that much of the code is dealing with timezones proactively. --- src/BrighterTrades.py | 4 +- src/DataCache_v2.py | 83 ++++++++++++------- src/Exchange.py | 66 +++++---------- src/app.py | 3 +- src/{ => archived_code}/DataCache.py | 0 .../archived_code}/test_DataCache.py | 0 src/candles.py | 13 ++- src/shared_utilities.py | 15 +++- tests/test_DataCache_v2.py | 59 +++++++++++-- 9 files changed, 150 insertions(+), 93 deletions(-) rename src/{ => archived_code}/DataCache.py (100%) rename {tests => src/archived_code}/test_DataCache.py (100%) diff --git a/src/BrighterTrades.py b/src/BrighterTrades.py index 28ea178..b9c85b8 100644 --- a/src/BrighterTrades.py +++ b/src/BrighterTrades.py @@ -1,7 +1,7 @@ from typing import Any -from DataCache import DataCache +from DataCache_v2 import DataCache from Strategies import Strategies from backtesting import Backtester from candles import Candles @@ -476,7 +476,7 @@ class BrighterTrades: trading_pair = params['symbol'] self.config.users.set_chart_view(values=trading_pair, specific_property='market', user_name=user_name) - elif setting == 'exchange_name': + elif setting == 'exchange': exchange_name = params['exchange_name'] # Get the first result of a list of available symbols from this exchange_name. market = self.exchanges.get_exchange(ename=exchange_name, uname=user_name).get_symbols()[0] diff --git a/src/DataCache_v2.py b/src/DataCache_v2.py index 957cbcd..1bbc460 100644 --- a/src/DataCache_v2.py +++ b/src/DataCache_v2.py @@ -40,9 +40,13 @@ def estimate_record_count(start_time, end_time, timeframe: str) -> int: # Convert timestamps from milliseconds to seconds for calculation start_time = int(start_time) / 1000 end_time = int(end_time) / 1000 - start_datetime = dt.datetime.utcfromtimestamp(start_time) - end_datetime = dt.datetime.utcfromtimestamp(end_time) + start_datetime = dt.datetime.utcfromtimestamp(start_time).replace(tzinfo=dt.timezone.utc) + end_datetime = dt.datetime.utcfromtimestamp(end_time).replace(tzinfo=dt.timezone.utc) elif isinstance(start_time, dt.datetime) and isinstance(end_time, dt.datetime): + if start_time.tzinfo is None: + raise ValueError("start_time is timezone naive. Please provide a timezone-aware datetime.") + if end_time.tzinfo is None: + raise ValueError("end_time is timezone naive. Please provide a timezone-aware datetime.") start_datetime = start_time end_datetime = end_time else: @@ -57,6 +61,11 @@ def estimate_record_count(start_time, end_time, timeframe: str) -> int: def generate_expected_timestamps(start_datetime: dt.datetime, end_datetime: dt.datetime, timeframe: str) -> pd.DatetimeIndex: + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + if end_datetime.tzinfo is None: + raise ValueError("end_datetime is timezone naive. Please provide a timezone-aware datetime.") + delta = timeframe_to_timedelta(timeframe) if isinstance(delta, pd.Timedelta): return pd.date_range(start=start_datetime, end=end_datetime, freq=delta) @@ -87,10 +96,14 @@ class DataCache: if not len(ex_details) == 4: raise TypeError("ex_details must include [asset, timeframe, exchange, user_name]") + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + end_datetime = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) + try: args = { 'start_datetime': start_datetime, - 'end_datetime': dt.datetime.utcnow(), + 'end_datetime': end_datetime, 'ex_details': ex_details, } return self.get_or_fetch_from('cache', **args) @@ -100,7 +113,13 @@ class DataCache: def get_or_fetch_from(self, target: str, **kwargs) -> pd.DataFrame: start_datetime = kwargs.get('start_datetime') + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + end_datetime = kwargs.get('end_datetime') + if end_datetime.tzinfo is None: + raise ValueError("end_datetime is timezone naive. Please provide a timezone-aware datetime.") + ex_details = kwargs.get('ex_details') timeframe = kwargs.get('ex_details')[1] @@ -159,7 +178,13 @@ class DataCache: def get_candles_from_cache(self, **kwargs) -> pd.DataFrame: start_datetime = kwargs.get('start_datetime') + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + end_datetime = kwargs.get('end_datetime') + if end_datetime.tzinfo is None: + raise ValueError("end_datetime is timezone naive. Please provide a timezone-aware datetime.") + ex_details = kwargs.get('ex_details') if self.TYPECHECKING_ENABLED: @@ -185,7 +210,13 @@ class DataCache: def get_from_database(self, **kwargs) -> pd.DataFrame: start_datetime = kwargs.get('start_datetime') + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + end_datetime = kwargs.get('end_datetime') + if end_datetime.tzinfo is None: + raise ValueError("end_datetime is timezone naive. Please provide a timezone-aware datetime.") + ex_details = kwargs.get('ex_details') if self.TYPECHECKING_ENABLED: @@ -200,7 +231,7 @@ class DataCache: table_name = self._make_key(ex_details) if not self.db.table_exists(table_name): - logger.debug('Records not in database.') + logger.debug('Records not in database.') return pd.DataFrame() logger.debug('Getting records from database.') @@ -213,7 +244,12 @@ class DataCache: exchange_name = kwargs.get('ex_details')[2] user_name = kwargs.get('ex_details')[3] start_datetime = kwargs.get('start_datetime') + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + end_datetime = kwargs.get('end_datetime') + if end_datetime.tzinfo is None: + raise ValueError("end_datetime is timezone naive. Please provide a timezone-aware datetime.") if self.TYPECHECKING_ENABLED: if not isinstance(symbol, str): @@ -243,21 +279,25 @@ class DataCache: :return: A tuple (is_complete, updated_request_criteria) where is_complete is True if the data is complete, False otherwise, and updated_request_criteria contains adjusted start/end times if data is incomplete. """ - start_datetime: dt.datetime = kwargs.get('start_datetime') - end_datetime: dt.datetime = kwargs.get('end_datetime') - timeframe: str = kwargs.get('timeframe') - if data.empty: logger.debug("Data is empty.") return False, kwargs # No data at all, proceed with the full original request + start_datetime: dt.datetime = kwargs.get('start_datetime') + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + + end_datetime: dt.datetime = kwargs.get('end_datetime') + if end_datetime.tzinfo is None: + raise ValueError("end_datetime is timezone naive. Please provide a timezone-aware datetime.") + + timeframe: str = kwargs.get('timeframe') + temp_data = data.copy() - # Ensure 'open_time' is in datetime format - if temp_data['open_time'].dtype != ' pd.DataFrame: if start_datetime is None: - start_datetime = dt.datetime(year=2017, month=1, day=1) + start_datetime = dt.datetime(year=2017, month=1, day=1, tzinfo=dt.timezone.utc) if end_datetime is None: - end_datetime = dt.datetime.utcnow() + end_datetime = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) if start_datetime > end_datetime: raise ValueError("Invalid start and end parameters: start_datetime must be before end_datetime.") @@ -439,16 +479,3 @@ class DataCache: sym, tf, ex, _ = ex_details key = f'{sym}_{tf}_{ex}' return key - -# Example usage -# args = { -# 'start_datetime': dt.datetime.now() - dt.timedelta(hours=1), # Example start time -# 'ex_details': ['BTCUSDT', '15m', 'Binance', 'user1'], -# } -# -# exchanges = ExchangeHandler() -# data = DataCache(exchanges) -# df = data.get_records_since(**args) -# -# # Disabling type checking for a specific instance -# data.TYPECHECKING_ENABLED = False diff --git a/src/Exchange.py b/src/Exchange.py index 3bde476..592da90 100644 --- a/src/Exchange.py +++ b/src/Exchange.py @@ -4,6 +4,7 @@ from datetime import datetime, timedelta, timezone from typing import Tuple, Dict, List, Union, Any import time import logging +from shared_utilities import timeframe_to_minutes logger = logging.getLogger(__name__) @@ -103,27 +104,33 @@ class Exchange: pd.DataFrame: A DataFrame containing the OHLCV data. """ if end_dt is None: - end_dt = datetime.utcnow() + end_dt = datetime.now(timezone.utc) - # Convert start_dt and end_dt to UTC if they are naive + # Ensure start_dt and end_dt are timezone-aware if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=timezone.utc) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=timezone.utc) - max_interval = timedelta(days=200) + # Maximum number of candles per request (usually 500 for Binance) + max_candles_per_request = 500 data_frames = [] current_start = start_dt while current_start < end_dt: - current_end = min(current_start + max_interval, end_dt) + # Estimate the current_end based on the max_candles_per_request + # and the interval between candles + current_end = min(end_dt, current_start + timedelta( + minutes=max_candles_per_request * timeframe_to_minutes(interval))) + start_str = self.datetime_to_unix_millis(current_start) end_str = self.datetime_to_unix_millis(current_end) try: logger.info(f"Fetching OHLCV data for {symbol} from {current_start} to {current_end}.") candles = self.client.fetch_ohlcv(symbol=symbol, timeframe=interval, - since=start_str, params={'endTime': end_str}) + since=start_str, limit=max_candles_per_request, + params={'endTime': end_str}) if not candles: logger.warning(f"No OHLCV data returned for {symbol} from {current_start} to {current_end}.") break @@ -133,7 +140,11 @@ class Exchange: data_frames.append(candles_df) - current_start = current_end + # Update current_start to the time of the last candle retrieved + last_candle_time = candles_df['open_time'].iloc[-1] / 1000 # Convert from milliseconds to seconds + current_start = datetime.utcfromtimestamp(last_candle_time).replace(tzinfo=timezone.utc) + timedelta( + milliseconds=1) + time.sleep(1) except ccxt.BaseError as e: @@ -141,7 +152,8 @@ class Exchange: break if data_frames: - result_df = pd.concat(data_frames) + # Combine all chunks and drop duplicates in one step + result_df = pd.concat(data_frames).drop_duplicates(subset=['open_time']).reset_index(drop=True) logger.info(f"Successfully fetched OHLCV data for {symbol}.") return result_df else: @@ -521,43 +533,3 @@ class Exchange: return [] else: return [] - - -# -# # Usage Examples -# -# # Example 1: Initializing the Exchange class -# api_keys = { -# 'key': 'your_api_key', -# 'secret': 'your_api_secret' -# } -# exchange = Exchange(name='Binance', api_keys=api_keys, exchange_id='binance') -# -# # Example 2: Fetching historical data -# start_date = datetime(2022, 1, 1) -# end_date = datetime(2022, 6, 1) -# historical_data = exchange.get_historical_klines(symbol='BTC/USDT', interval='1d', -# start_dt=start_date, end_dt=end_date) -# print(historical_data) -# -# # Example 3: Fetching the current price of a symbol -# current_price = exchange.get_price(symbol='BTC/USDT') -# print(f"Current price of BTC/USDT: {current_price}") -# -# # Example 4: Placing a limit buy order -# order_result, order_details = exchange.place_order(symbol='BTC/USDT', side='buy', type='limit', -# timeInForce='GTC', quantity=0.001, price=30000) -# print(order_result, order_details) -# -# # Example 5: Getting account balances -# balances = exchange.get_balances() -# print(balances) -# -# # Example 6: Fetching open orders -# open_orders = exchange.get_open_orders() -# print(open_orders) -# -# # Example 7: Fetching active trades -# active_trades = exchange.get_active_trades() -# print(active_trades) -# diff --git a/src/app.py b/src/app.py index d49fa1a..66ab56e 100644 --- a/src/app.py +++ b/src/app.py @@ -51,8 +51,7 @@ def index(): Fetches data from brighter_trades and inject it into an HTML template. Renders the html template and serves the web application. """ - # Clear the session to simulate a new visitor - session.clear() + try: # Log the user in. user_name = brighter_trades.config.users.load_or_create_user(username=session.get('user')) diff --git a/src/DataCache.py b/src/archived_code/DataCache.py similarity index 100% rename from src/DataCache.py rename to src/archived_code/DataCache.py diff --git a/tests/test_DataCache.py b/src/archived_code/test_DataCache.py similarity index 100% rename from tests/test_DataCache.py rename to src/archived_code/test_DataCache.py diff --git a/src/candles.py b/src/candles.py index 118c26d..2c00ccc 100644 --- a/src/candles.py +++ b/src/candles.py @@ -1,5 +1,8 @@ import datetime as dt import logging as log + +import pytz + from shared_utilities import timeframe_to_minutes, ts_of_n_minutes_ago @@ -26,7 +29,7 @@ class Candles: def get_last_n_candles(self, num_candles: int, asset: str, timeframe: str, exchange: str, user_name: str): """ - Return the last num_candles candles of a specified timeframe, symbol and exchange_name. + Return the last num_candles candles of a specified timeframe, symbol, and exchange_name. :param user_name: The name of the user that owns the exchange. :param num_candles: int - The number of records to return. @@ -40,14 +43,18 @@ class Candles: # Convert the timeframe to candle_length. minutes_per_candle = timeframe_to_minutes(timeframe) - # Calculate the approximate start_datetime the first of n record will have. + # Calculate the approximate start_datetime the first of n records will have. start_datetime = ts_of_n_minutes_ago(n=num_candles, candle_length=minutes_per_candle) + # Ensure the start_datetime is timezone aware + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please ensure it is timezone-aware.") + # Fetch records older than start_datetime. candles = self.data.get_records_since(start_datetime=start_datetime, ex_details=[asset, timeframe, exchange, user_name]) if len(candles.index) < num_candles: - timesince = dt.datetime.utcnow() - start_datetime + timesince = dt.datetime.now(pytz.UTC) - start_datetime minutes_since = int(timesince.total_seconds() / 60) print(f"""candles[103]: Received {len(candles.index)} candles but requested {num_candles}. At {minutes_per_candle} minutes_per_candle since {start_datetime}. There should diff --git a/src/shared_utilities.py b/src/shared_utilities.py index 1e9c76f..ede448f 100644 --- a/src/shared_utilities.py +++ b/src/shared_utilities.py @@ -3,8 +3,9 @@ import datetime as dt from typing import Union import pandas as pd +import pytz -epoch = dt.datetime.utcfromtimestamp(0) +epoch = dt.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.UTC) def query_uptodate(records: pd.DataFrame, r_length_min: float) -> Union[float, None]: @@ -21,7 +22,7 @@ def query_uptodate(records: pd.DataFrame, r_length_min: float) -> Union[float, N print(f'The last timestamp on record is {last_timestamp}') # Get a timestamp of the UTC time in milliseconds to match the records in the DB - now_timestamp = unix_time_millis(dt.datetime.utcnow()) + now_timestamp = unix_time_millis(dt.datetime.now(pytz.UTC)) print(f'The timestamp now is {now_timestamp}') # Get the seconds since the records have been updated @@ -57,6 +58,9 @@ def unix_time_seconds(d_time: dt.datetime) -> float: :param d_time: The datetime object to convert. :return: The Unix timestamp in seconds. """ + if d_time.tzinfo is None: + raise ValueError("d_time is timezone naive. Please provide a timezone-aware datetime.") + return (d_time - epoch).total_seconds() @@ -67,6 +71,8 @@ def unix_time_millis(d_time: dt.datetime) -> float: :param d_time: The datetime object to convert. :return: The Unix timestamp in milliseconds. """ + if d_time.tzinfo is None: + raise ValueError("d_time is timezone naive. Please provide a timezone-aware datetime.") return (d_time - epoch).total_seconds() * 1000.0 @@ -86,6 +92,9 @@ def query_satisfied(start_datetime: dt.datetime, records: pd.DataFrame, r_length """ print('\nChecking if the query is satisfied...') + if start_datetime.tzinfo is None: + raise ValueError("start_datetime is timezone naive. Please provide a timezone-aware datetime.") + # Convert start_datetime to Unix timestamp in milliseconds start_timestamp = unix_time_millis(start_datetime) print(f'Start timestamp: {start_timestamp}') @@ -125,7 +134,7 @@ def ts_of_n_minutes_ago(n: int, candle_length: float) -> dt.datetime: minutes_ago = n * candle_length # Get the current UTC datetime. - now = dt.datetime.utcnow() + now = dt.datetime.now(pytz.UTC) # Calculate the datetime for 'n' candles ago. date_of = now - dt.timedelta(minutes=minutes_ago) diff --git a/tests/test_DataCache_v2.py b/tests/test_DataCache_v2.py index 8d4f302..6c729ab 100644 --- a/tests/test_DataCache_v2.py +++ b/tests/test_DataCache_v2.py @@ -1,3 +1,4 @@ +import pytz from DataCache_v2 import DataCache from ExchangeInterface import ExchangeInterface import unittest @@ -56,13 +57,19 @@ class DataGenerator: Returns: pd.DataFrame: A DataFrame with the simulated data. """ + # Ensure provided datetime parameters are timezone aware + if start and start.tzinfo is None: + raise ValueError('start datetime must be timezone aware.') + if end and end.tzinfo is None: + raise ValueError('end datetime must be timezone aware.') + # If neither start nor end are provided. if start is None and end is None: - end = dt.datetime.utcnow() + end = dt.datetime.now(dt.timezone.utc) if num_rec is None: raise ValueError("num_rec must be provided if both start and end are not specified.") - # If only start is provided. + # If start and end are provided. if start is not None and end is not None: total_duration = (end - start).total_seconds() interval_seconds = self.timeframe_amount * self._get_seconds_per_unit(self.timeframe_unit) @@ -74,6 +81,7 @@ class DataGenerator: raise ValueError("num_rec must be provided if both start and end are not specified.") interval_seconds = self.timeframe_amount * self._get_seconds_per_unit(self.timeframe_unit) start = end - dt.timedelta(seconds=(num_rec - 1) * interval_seconds) + start = start.replace(tzinfo=pytz.utc) # Ensure start is aligned to the timeframe interval start = self.round_down_datetime(start, self.timeframe_unit[0], self.timeframe_amount) @@ -135,7 +143,7 @@ class DataGenerator: Returns a datetime object representing the current time minus the offset in the specified units. """ delta_args = {self.timeframe_unit: offset} - return dt.datetime.utcnow() - dt.timedelta(**delta_args) + return dt.datetime.utcnow().replace(tzinfo=pytz.utc) - dt.timedelta(**delta_args) def _delta(self, i): """ @@ -145,15 +153,20 @@ class DataGenerator: return dt.timedelta(**delta_args) @staticmethod - def unix_time_millis(dt_obj): + def unix_time_millis(dt_obj: dt.datetime): """ Convert a datetime object to Unix time in milliseconds. """ - epoch = dt.datetime(1970, 1, 1) + if dt_obj.tzinfo is None: + raise ValueError('dt_obj needs to be timezone aware.') + epoch = dt.datetime(1970, 1, 1).replace(tzinfo=pytz.UTC) return int((dt_obj - epoch).total_seconds() * 1000) @staticmethod def round_down_datetime(dt_obj: dt.datetime, unit: str, interval: int) -> dt.datetime: + if dt_obj.tzinfo is None: + raise ValueError('dt_obj needs to be timezone aware.') + if unit == 's': # Round down to the nearest interval of seconds seconds = (dt_obj.second // interval) * interval dt_obj = dt_obj.replace(second=seconds, microsecond=0) @@ -373,8 +386,13 @@ class TestDataCacheV2(unittest.TestCase): # Calculate the start time for querying the records. start_datetime = data_gen.x_time_ago(query_offset) + + # Ensure start_datetime is timezone-aware (UTC). + if start_datetime.tzinfo is None: + start_datetime = start_datetime.replace(tzinfo=dt.timezone.utc) + # Defaults to current time if not provided to get_records_since() - query_end_time = dt.datetime.utcnow() + query_end_time = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) print(f'Requesting records from {start_datetime} to {query_end_time}') # Query the records since the calculated start time. @@ -404,7 +422,7 @@ class TestDataCacheV2(unittest.TestCase): print("\nThe DataFrames have the same shape and the 'open_time' columns match.") # Verify that the oldest timestamp in the result is within the allowed time difference. - oldest_timestamp = pd.to_datetime(result['open_time'].min(), unit='ms') + oldest_timestamp = pd.to_datetime(result['open_time'].min(), unit='ms').tz_localize('UTC') time_diff = oldest_timestamp - start_datetime max_allowed_time_diff = dt.timedelta(**{data_gen.timeframe_unit: data_gen.timeframe_amount}) @@ -415,7 +433,7 @@ class TestDataCacheV2(unittest.TestCase): print(f'The first timestamp is {time_diff} from {start_datetime}') # Verify that the newest timestamp in the result is within the allowed time difference. - newest_timestamp = pd.to_datetime(result['open_time'].max(), unit='ms') + newest_timestamp = pd.to_datetime(result['open_time'].max(), unit='ms').tz_localize('UTC') time_diff_end = abs(query_end_time - newest_timestamp) assert dt.timedelta(0) <= time_diff_end <= max_allowed_time_diff, \ @@ -448,6 +466,31 @@ class TestDataCacheV2(unittest.TestCase): print('\nTest get_records_since with missing section in data') self._test_get_records_since(simulate_scenarios='missing_section') + def test_other_timeframes(self): + # print('\nTest get_records_since with a different timeframe') + # ex_details = ['BTC/USD', '15m', 'binance', 'test_guy'] + # start_datetime = dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=2) + # # Query the records since the calculated start time. + # result = self.data.get_records_since(start_datetime=start_datetime, ex_details=ex_details) + # last_record_time = pd.to_datetime(result['open_time'].max(), unit='ms').tz_localize('UTC') + # assert last_record_time > dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=15.1) + # + # print('\nTest get_records_since with a different timeframe') + # ex_details = ['BTC/USD', '5m', 'binance', 'test_guy'] + # start_datetime = dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=1) + # # Query the records since the calculated start time. + # result = self.data.get_records_since(start_datetime=start_datetime, ex_details=ex_details) + # last_record_time = pd.to_datetime(result['open_time'].max(), unit='ms').tz_localize('UTC') + # assert last_record_time > dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=5.1) + + print('\nTest get_records_since with a different timeframe') + ex_details = ['BTC/USD', '4h', 'binance', 'test_guy'] + start_datetime = dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=12) + # Query the records since the calculated start time. + result = self.data.get_records_since(start_datetime=start_datetime, ex_details=ex_details) + last_record_time = pd.to_datetime(result['open_time'].max(), unit='ms').tz_localize('UTC') + assert last_record_time > dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=4.1) + def test_populate_db(self): print('Testing _populate_db() method:') # Create a table of candle records.