168 lines
6.9 KiB
Python
168 lines
6.9 KiB
Python
import time
|
|
import unittest
|
|
import datetime as dt
|
|
import pandas as pd
|
|
from shared_utilities import (
|
|
query_uptodate, ms_to_seconds, unix_time_seconds, unix_time_millis,
|
|
query_satisfied, ts_of_n_minutes_ago, timeframe_to_minutes
|
|
)
|
|
|
|
|
|
def utcnow() -> dt.datetime:
|
|
"""Return timezone-aware UTC datetime."""
|
|
return dt.datetime.now(dt.timezone.utc)
|
|
|
|
|
|
class TestSharedUtilities(unittest.TestCase):
|
|
|
|
def test_query_uptodate(self):
|
|
print('Testing query_uptodate()')
|
|
|
|
# (Test case 1) The records should not be up-to-date (very old timestamps)
|
|
records = pd.DataFrame({
|
|
'time': [1, 2, 3, 4, 5]
|
|
})
|
|
result = query_uptodate(records, 1)
|
|
if result is None:
|
|
print('Records are up-to-date.')
|
|
else:
|
|
print('Records are not up-to-date.')
|
|
print(f'Result for the first test case: {result}')
|
|
self.assertIsNotNone(result)
|
|
|
|
# (Test case 2) The records should be up-to-date (recent timestamps)
|
|
now = unix_time_millis(utcnow())
|
|
recent_records = pd.DataFrame({
|
|
'time': [now - 70000, now - 60000, now - 40000]
|
|
})
|
|
result = query_uptodate(recent_records, 1)
|
|
if result is None:
|
|
print('Records are up-to-date.')
|
|
else:
|
|
print('Records are not up-to-date.')
|
|
print(f'Result for the second test case: {result}')
|
|
self.assertIsNone(result)
|
|
|
|
# (Test case 3) The records just under the tolerance for a record length of 1 hour.
|
|
# The records should not be up-to-date (recent timestamps)
|
|
one_hour = 60 * 60 * 1000 # one hour in milliseconds
|
|
tolerance_milliseconds = 10 * 1000 # tolerance in milliseconds
|
|
recent_time = unix_time_millis(utcnow())
|
|
borderline_records = pd.DataFrame({
|
|
'time': [recent_time - one_hour + (tolerance_milliseconds - 3)] # just within the tolerance
|
|
})
|
|
result = query_uptodate(borderline_records, 60)
|
|
if result is None:
|
|
print('Records are up-to-date.')
|
|
else:
|
|
print('Records are not up-to-date.')
|
|
print(f'Result for the third test case: {result}')
|
|
self.assertIsNotNone(result)
|
|
|
|
# (Test case 4) The records just over the tolerance for a record length of 1 hour.
|
|
# The records should be up-to-date (recent timestamps)
|
|
one_hour = 60 * 60 * 1000 # one hour in milliseconds
|
|
tolerance_milliseconds = 10 * 1000 # tolerance in milliseconds
|
|
recent_time = unix_time_millis(utcnow())
|
|
borderline_records = pd.DataFrame({
|
|
'time': [recent_time - one_hour + (tolerance_milliseconds + 3)] # just within the tolerance
|
|
})
|
|
result = query_uptodate(borderline_records, 60)
|
|
if result is None:
|
|
print('Records are up-to-date.')
|
|
else:
|
|
print('Records are not up-to-date.')
|
|
print(f'Result for the third test case: {result}')
|
|
self.assertIsNone(result)
|
|
|
|
def test_ms_to_seconds(self):
|
|
print('Testing ms_to_seconds()')
|
|
self.assertEqual(ms_to_seconds(1000), 1)
|
|
self.assertEqual(ms_to_seconds(0), 0)
|
|
|
|
def test_unix_time_seconds(self):
|
|
print('Testing unix_time_seconds()')
|
|
time = dt.datetime(2020, 1, 1, tzinfo=dt.timezone.utc)
|
|
self.assertEqual(unix_time_seconds(time), 1577836800)
|
|
|
|
def test_unix_time_millis(self):
|
|
print('Testing unix_time_millis()')
|
|
time = dt.datetime(2020, 1, 1, tzinfo=dt.timezone.utc)
|
|
self.assertEqual(unix_time_millis(time), 1577836800000.0)
|
|
|
|
def test_query_satisfied(self):
|
|
print('Testing query_satisfied()')
|
|
|
|
# Test case where the records should satisfy the query (records cover the start time)
|
|
start_datetime = dt.datetime(2020, 1, 1, tzinfo=dt.timezone.utc)
|
|
records = pd.DataFrame({
|
|
'time': [1577836800000.0 - 600000, 1577836800000.0 - 300000, 1577836800000.0]
|
|
# Covering the start time
|
|
})
|
|
result = query_satisfied(start_datetime, records, 1)
|
|
if result is None:
|
|
print('Query is satisfied: records span back far enough.')
|
|
else:
|
|
print('Query is not satisfied: records do not span back far enough.')
|
|
print(f'Result for the first test case: {result}')
|
|
self.assertIsNotNone(result)
|
|
|
|
# Test case where the records should not satisfy the query (recent records but not enough)
|
|
recent_time = unix_time_millis(utcnow())
|
|
records = pd.DataFrame({
|
|
'time': [recent_time - 300 * 60 * 1000, recent_time - 240 * 60 * 1000, recent_time - 180 * 60 * 1000]
|
|
})
|
|
result = query_satisfied(start_datetime, records, 1)
|
|
if result is None:
|
|
print('Query is satisfied: records span back far enough.')
|
|
else:
|
|
print('Query is not satisfied: records do not span back far enough.')
|
|
print(f'Result for the second test case: {result}')
|
|
self.assertIsNone(result)
|
|
|
|
# Additional test case for partial coverage
|
|
start_datetime = utcnow() - dt.timedelta(minutes=300)
|
|
records = pd.DataFrame({
|
|
'time': [unix_time_millis(utcnow() - dt.timedelta(minutes=240)),
|
|
unix_time_millis(utcnow() - dt.timedelta(minutes=180)),
|
|
unix_time_millis(utcnow() - dt.timedelta(minutes=120))]
|
|
})
|
|
result = query_satisfied(start_datetime, records, 60)
|
|
if result is None:
|
|
print('Query is satisfied: records span back far enough.')
|
|
else:
|
|
print('Query is not satisfied: records do not span back far enough.')
|
|
print(f'Result for the third test case: {result}')
|
|
self.assertIsNone(result)
|
|
|
|
def test_ts_of_n_minutes_ago(self):
|
|
print('Testing ts_of_n_minutes_ago()')
|
|
now = utcnow()
|
|
|
|
test_cases = [
|
|
(60, 1), # 60 candles of 1 minute each
|
|
(10, 5), # 10 candles of 5 minutes each
|
|
(1, 1440), # 1 candle of 1 day (1440 minutes)
|
|
(7, 10080), # 7 candles of 1 week (10080 minutes)
|
|
(30, 60), # 30 candles of 1 hour (60 minutes)
|
|
]
|
|
|
|
for n, candle_length in test_cases:
|
|
with self.subTest(n=n, candle_length=candle_length):
|
|
result = ts_of_n_minutes_ago(n, candle_length)
|
|
expected_time = now - dt.timedelta(minutes=(n + 1) * candle_length)
|
|
self.assertAlmostEqual(unix_time_seconds(result), unix_time_seconds(expected_time), delta=60)
|
|
|
|
def test_timeframe_to_minutes(self):
|
|
print('Testing timeframe_to_minutes()')
|
|
self.assertEqual(timeframe_to_minutes('15m'), 15)
|
|
self.assertEqual(timeframe_to_minutes('1h'), 60)
|
|
self.assertEqual(timeframe_to_minutes('1d'), 1440)
|
|
self.assertEqual(timeframe_to_minutes('1w'), 10080)
|
|
self.assertEqual(timeframe_to_minutes('1M'), 44640) # 31 days in a month
|
|
self.assertEqual(timeframe_to_minutes('1Y'), 525600) # 525600 minutes in a year
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|