brighter-trading/tests/test_shared_utilities.py

163 lines
6.9 KiB
Python

import time
import unittest
import datetime as dt
import pandas as pd
from shared_utilities import (
query_uptodate, ms_to_seconds, unix_time_seconds, unix_time_millis,
query_satisfied, ts_of_n_minutes_ago, timeframe_to_minutes
)
class TestSharedUtilities(unittest.TestCase):
def test_query_uptodate(self):
print('Testing query_uptodate()')
# (Test case 1) The records should not be up-to-date (very old timestamps)
records = pd.DataFrame({
'open_time': [1, 2, 3, 4, 5]
})
result = query_uptodate(records, 1)
if result is None:
print('Records are up-to-date.')
else:
print('Records are not up-to-date.')
print(f'Result for the first test case: {result}')
self.assertIsNotNone(result)
# (Test case 2) The records should be up-to-date (recent timestamps)
now = unix_time_millis(dt.datetime.utcnow())
recent_records = pd.DataFrame({
'open_time': [now - 70000, now - 60000, now - 40000]
})
result = query_uptodate(recent_records, 1)
if result is None:
print('Records are up-to-date.')
else:
print('Records are not up-to-date.')
print(f'Result for the second test case: {result}')
self.assertIsNone(result)
# (Test case 3) The records just under the tolerance for a record length of 1 hour.
# The records should not be up-to-date (recent timestamps)
one_hour = 60 * 60 * 1000 # one hour in milliseconds
tolerance_milliseconds = 10 * 1000 # tolerance in milliseconds
recent_time = unix_time_millis(dt.datetime.utcnow())
borderline_records = pd.DataFrame({
'open_time': [recent_time - one_hour + (tolerance_milliseconds - 3)] # just within the tolerance
})
result = query_uptodate(borderline_records, 60)
if result is None:
print('Records are up-to-date.')
else:
print('Records are not up-to-date.')
print(f'Result for the third test case: {result}')
self.assertIsNotNone(result)
# (Test case 4) The records just over the tolerance for a record length of 1 hour.
# The records should be up-to-date (recent timestamps)
one_hour = 60 * 60 * 1000 # one hour in milliseconds
tolerance_milliseconds = 10 * 1000 # tolerance in milliseconds
recent_time = unix_time_millis(dt.datetime.utcnow())
borderline_records = pd.DataFrame({
'open_time': [recent_time - one_hour + (tolerance_milliseconds + 3)] # just within the tolerance
})
result = query_uptodate(borderline_records, 60)
if result is None:
print('Records are up-to-date.')
else:
print('Records are not up-to-date.')
print(f'Result for the third test case: {result}')
self.assertIsNone(result)
def test_ms_to_seconds(self):
print('Testing ms_to_seconds()')
self.assertEqual(ms_to_seconds(1000), 1)
self.assertEqual(ms_to_seconds(0), 0)
def test_unix_time_seconds(self):
print('Testing unix_time_seconds()')
time = dt.datetime(2020, 1, 1)
self.assertEqual(unix_time_seconds(time), 1577836800)
def test_unix_time_millis(self):
print('Testing unix_time_millis()')
time = dt.datetime(2020, 1, 1)
self.assertEqual(unix_time_millis(time), 1577836800000.0)
def test_query_satisfied(self):
print('Testing query_satisfied()')
# Test case where the records should satisfy the query (records cover the start time)
start_datetime = dt.datetime(2020, 1, 1)
records = pd.DataFrame({
'open_time': [1577836800000.0 - 600000, 1577836800000.0 - 300000, 1577836800000.0]
# Covering the start time
})
result = query_satisfied(start_datetime, records, 1)
if result is None:
print('Query is satisfied: records span back far enough.')
else:
print('Query is not satisfied: records do not span back far enough.')
print(f'Result for the first test case: {result}')
self.assertIsNotNone(result)
# Test case where the records should not satisfy the query (recent records but not enough)
recent_time = unix_time_millis(dt.datetime.utcnow())
records = pd.DataFrame({
'open_time': [recent_time - 300 * 60 * 1000, recent_time - 240 * 60 * 1000, recent_time - 180 * 60 * 1000]
})
result = query_satisfied(start_datetime, records, 1)
if result is None:
print('Query is satisfied: records span back far enough.')
else:
print('Query is not satisfied: records do not span back far enough.')
print(f'Result for the second test case: {result}')
self.assertIsNone(result)
# Additional test case for partial coverage
start_datetime = dt.datetime.utcnow() - dt.timedelta(minutes=300)
records = pd.DataFrame({
'open_time': [unix_time_millis(dt.datetime.utcnow() - dt.timedelta(minutes=240)),
unix_time_millis(dt.datetime.utcnow() - dt.timedelta(minutes=180)),
unix_time_millis(dt.datetime.utcnow() - dt.timedelta(minutes=120))]
})
result = query_satisfied(start_datetime, records, 60)
if result is None:
print('Query is satisfied: records span back far enough.')
else:
print('Query is not satisfied: records do not span back far enough.')
print(f'Result for the third test case: {result}')
self.assertIsNone(result)
def test_ts_of_n_minutes_ago(self):
print('Testing ts_of_n_minutes_ago()')
now = dt.datetime.utcnow()
test_cases = [
(60, 1), # 60 candles of 1 minute each
(10, 5), # 10 candles of 5 minutes each
(1, 1440), # 1 candle of 1 day (1440 minutes)
(7, 10080), # 7 candles of 1 week (10080 minutes)
(30, 60), # 30 candles of 1 hour (60 minutes)
]
for n, candle_length in test_cases:
with self.subTest(n=n, candle_length=candle_length):
result = ts_of_n_minutes_ago(n, candle_length)
expected_time = now - dt.timedelta(minutes=(n + 1) * candle_length)
self.assertAlmostEqual(unix_time_seconds(result), unix_time_seconds(expected_time), delta=60)
def test_timeframe_to_minutes(self):
print('Testing timeframe_to_minutes()')
self.assertEqual(timeframe_to_minutes('15m'), 15)
self.assertEqual(timeframe_to_minutes('1h'), 60)
self.assertEqual(timeframe_to_minutes('1d'), 1440)
self.assertEqual(timeframe_to_minutes('1w'), 10080)
self.assertEqual(timeframe_to_minutes('1M'), 44640) # 31 days in a month
self.assertEqual(timeframe_to_minutes('1Y'), 525600) # 525600 minutes in a year
if __name__ == '__main__':
unittest.main()