import time import unittest import datetime as dt import pandas as pd from shared_utilities import ( query_uptodate, ms_to_seconds, unix_time_seconds, unix_time_millis, query_satisfied, ts_of_n_minutes_ago, timeframe_to_minutes ) def utcnow() -> dt.datetime: """Return timezone-aware UTC datetime.""" return dt.datetime.now(dt.timezone.utc) class TestSharedUtilities(unittest.TestCase): def test_query_uptodate(self): print('Testing query_uptodate()') # (Test case 1) The records should not be up-to-date (very old timestamps) records = pd.DataFrame({ 'time': [1, 2, 3, 4, 5] }) result = query_uptodate(records, 1) if result is None: print('Records are up-to-date.') else: print('Records are not up-to-date.') print(f'Result for the first test case: {result}') self.assertIsNotNone(result) # (Test case 2) The records should be up-to-date (recent timestamps) now = unix_time_millis(utcnow()) recent_records = pd.DataFrame({ 'time': [now - 70000, now - 60000, now - 40000] }) result = query_uptodate(recent_records, 1) if result is None: print('Records are up-to-date.') else: print('Records are not up-to-date.') print(f'Result for the second test case: {result}') self.assertIsNone(result) # (Test case 3) The records just under the tolerance for a record length of 1 hour. # The records should not be up-to-date (recent timestamps) one_hour = 60 * 60 * 1000 # one hour in milliseconds tolerance_milliseconds = 10 * 1000 # tolerance in milliseconds recent_time = unix_time_millis(utcnow()) borderline_records = pd.DataFrame({ 'time': [recent_time - one_hour + (tolerance_milliseconds - 3)] # just within the tolerance }) result = query_uptodate(borderline_records, 60) if result is None: print('Records are up-to-date.') else: print('Records are not up-to-date.') print(f'Result for the third test case: {result}') self.assertIsNotNone(result) # (Test case 4) The records just over the tolerance for a record length of 1 hour. # The records should be up-to-date (recent timestamps) one_hour = 60 * 60 * 1000 # one hour in milliseconds tolerance_milliseconds = 10 * 1000 # tolerance in milliseconds recent_time = unix_time_millis(utcnow()) borderline_records = pd.DataFrame({ 'time': [recent_time - one_hour + (tolerance_milliseconds + 3)] # just within the tolerance }) result = query_uptodate(borderline_records, 60) if result is None: print('Records are up-to-date.') else: print('Records are not up-to-date.') print(f'Result for the third test case: {result}') self.assertIsNone(result) def test_ms_to_seconds(self): print('Testing ms_to_seconds()') self.assertEqual(ms_to_seconds(1000), 1) self.assertEqual(ms_to_seconds(0), 0) def test_unix_time_seconds(self): print('Testing unix_time_seconds()') time = dt.datetime(2020, 1, 1, tzinfo=dt.timezone.utc) self.assertEqual(unix_time_seconds(time), 1577836800) def test_unix_time_millis(self): print('Testing unix_time_millis()') time = dt.datetime(2020, 1, 1, tzinfo=dt.timezone.utc) self.assertEqual(unix_time_millis(time), 1577836800000.0) def test_query_satisfied(self): print('Testing query_satisfied()') # Test case where the records should satisfy the query (records cover the start time) start_datetime = dt.datetime(2020, 1, 1, tzinfo=dt.timezone.utc) records = pd.DataFrame({ 'time': [1577836800000.0 - 600000, 1577836800000.0 - 300000, 1577836800000.0] # Covering the start time }) result = query_satisfied(start_datetime, records, 1) if result is None: print('Query is satisfied: records span back far enough.') else: print('Query is not satisfied: records do not span back far enough.') print(f'Result for the first test case: {result}') self.assertIsNotNone(result) # Test case where the records should not satisfy the query (recent records but not enough) recent_time = unix_time_millis(utcnow()) records = pd.DataFrame({ 'time': [recent_time - 300 * 60 * 1000, recent_time - 240 * 60 * 1000, recent_time - 180 * 60 * 1000] }) result = query_satisfied(start_datetime, records, 1) if result is None: print('Query is satisfied: records span back far enough.') else: print('Query is not satisfied: records do not span back far enough.') print(f'Result for the second test case: {result}') self.assertIsNone(result) # Additional test case for partial coverage start_datetime = utcnow() - dt.timedelta(minutes=300) records = pd.DataFrame({ 'time': [unix_time_millis(utcnow() - dt.timedelta(minutes=240)), unix_time_millis(utcnow() - dt.timedelta(minutes=180)), unix_time_millis(utcnow() - dt.timedelta(minutes=120))] }) result = query_satisfied(start_datetime, records, 60) if result is None: print('Query is satisfied: records span back far enough.') else: print('Query is not satisfied: records do not span back far enough.') print(f'Result for the third test case: {result}') self.assertIsNone(result) def test_ts_of_n_minutes_ago(self): print('Testing ts_of_n_minutes_ago()') now = utcnow() test_cases = [ (60, 1), # 60 candles of 1 minute each (10, 5), # 10 candles of 5 minutes each (1, 1440), # 1 candle of 1 day (1440 minutes) (7, 10080), # 7 candles of 1 week (10080 minutes) (30, 60), # 30 candles of 1 hour (60 minutes) ] for n, candle_length in test_cases: with self.subTest(n=n, candle_length=candle_length): result = ts_of_n_minutes_ago(n, candle_length) expected_time = now - dt.timedelta(minutes=(n + 1) * candle_length) self.assertAlmostEqual(unix_time_seconds(result), unix_time_seconds(expected_time), delta=60) def test_timeframe_to_minutes(self): print('Testing timeframe_to_minutes()') self.assertEqual(timeframe_to_minutes('15m'), 15) self.assertEqual(timeframe_to_minutes('1h'), 60) self.assertEqual(timeframe_to_minutes('1d'), 1440) self.assertEqual(timeframe_to_minutes('1w'), 10080) self.assertEqual(timeframe_to_minutes('1M'), 44640) # 31 days in a month self.assertEqual(timeframe_to_minutes('1Y'), 525600) # 525600 minutes in a year if __name__ == '__main__': unittest.main()