exchange-data-manager/tests/test_assembler.py

268 lines
8.5 KiB
Python

"""Tests for candle assembler."""
import pytest
from exchange_data_manager.candles.assembler import (
CandleAssembler,
timeframe_to_seconds,
get_candle_boundary,
)
from exchange_data_manager.candles.models import Candle
class TestTimeframeToSeconds:
"""Tests for timeframe_to_seconds function in assembler."""
def test_standard_timeframes(self):
"""Test standard timeframe conversions."""
assert timeframe_to_seconds("1m") == 60
assert timeframe_to_seconds("5m") == 300
assert timeframe_to_seconds("1h") == 3600
assert timeframe_to_seconds("1d") == 86400
class TestGetCandleBoundary:
"""Tests for get_candle_boundary function."""
def test_exact_boundary(self):
"""Test timestamp exactly on boundary."""
# 1709337600 is a clean hour boundary
assert get_candle_boundary(1709337600, 3600) == 1709337600
def test_mid_period(self):
"""Test timestamp in middle of period."""
# 30 minutes into the hour
assert get_candle_boundary(1709337600 + 1800, 3600) == 1709337600
def test_5m_boundary(self):
"""Test 5-minute boundaries."""
base = 1709337600
assert get_candle_boundary(base + 120, 300) == base # 2 min into period
assert get_candle_boundary(base + 300, 300) == base + 300 # Start of next
class TestCandleAssembler:
"""Tests for CandleAssembler class."""
def test_create_assembler(self):
"""Test creating an assembler."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
assert assembler.timeframe == "1m"
assert assembler._timeframe_seconds == 60
assert assembler.current_candle is None
def test_process_first_tick(self):
"""Test processing the first tick."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
timestamp = 1709337600 # Start of a minute
result = assembler.process_tick(50000.0, volume=1.5, timestamp=timestamp)
# First tick doesn't complete a candle
assert result is None
assert assembler.current_candle is not None
assert assembler.current_candle.open == 50000.0
assert assembler.current_candle.high == 50000.0
assert assembler.current_candle.low == 50000.0
assert assembler.current_candle.close == 50000.0
assert assembler.current_candle.volume == 1.5
def test_process_multiple_ticks_same_period(self):
"""Test processing multiple ticks in the same period."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
base_time = 1709337600
# First tick
assembler.process_tick(50000.0, volume=1.0, timestamp=base_time)
# Higher price
assembler.process_tick(50500.0, volume=0.5, timestamp=base_time + 10)
# Lower price
assembler.process_tick(49800.0, volume=0.3, timestamp=base_time + 20)
# Close price
assembler.process_tick(50200.0, volume=0.2, timestamp=base_time + 30)
candle = assembler.current_candle
assert candle.open == 50000.0
assert candle.high == 50500.0
assert candle.low == 49800.0
assert candle.close == 50200.0
assert candle.volume == 2.0 # 1.0 + 0.5 + 0.3 + 0.2
def test_period_boundary_completes_candle(self):
"""Test that crossing period boundary completes a candle."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
base_time = 1709337600
# Ticks in first minute
assembler.process_tick(50000.0, volume=1.0, timestamp=base_time)
assembler.process_tick(50200.0, volume=0.5, timestamp=base_time + 30)
# Tick in next minute - should complete previous candle
completed = assembler.process_tick(50300.0, volume=0.8, timestamp=base_time + 60)
assert completed is not None
assert completed.time == base_time
assert completed.open == 50000.0
assert completed.close == 50200.0
assert completed.closed is True
# New candle should be started
assert assembler.current_candle.time == base_time + 60
assert assembler.current_candle.open == 50300.0
def test_callback_on_close(self):
"""Test that on_candle_close callback is called."""
closed_candles = []
def on_close(candle: Candle):
closed_candles.append(candle)
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
on_candle_close=on_close,
)
base_time = 1709337600
# First minute
assembler.process_tick(50000.0, timestamp=base_time)
# Second minute - closes first candle
assembler.process_tick(50100.0, timestamp=base_time + 60)
assert len(closed_candles) == 1
assert closed_candles[0].time == base_time
def test_callback_on_update(self):
"""Test that on_candle_update callback is called."""
updates = []
def on_update(candle: Candle):
updates.append(candle.close)
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
on_candle_update=on_update,
)
base_time = 1709337600
assembler.process_tick(50000.0, timestamp=base_time)
assembler.process_tick(50100.0, timestamp=base_time + 10)
assembler.process_tick(50200.0, timestamp=base_time + 20)
assert len(updates) == 3
assert updates == [50000.0, 50100.0, 50200.0]
def test_current_candle_property(self):
"""Test getting the current in-progress candle."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
# No candle yet
assert assembler.current_candle is None
# After first tick
assembler.process_tick(50000.0, timestamp=1709337600)
candle = assembler.current_candle
assert candle is not None
assert candle.closed is False
def test_reset(self):
"""Test resetting the assembler."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
assembler.process_tick(50000.0, timestamp=1709337600)
assert assembler.current_candle is not None
assembler.reset()
assert assembler.current_candle is None
def test_different_timeframes(self):
"""Test assembler with different timeframes."""
# 5-minute candles
assembler_5m = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="5m",
)
assert assembler_5m._timeframe_seconds == 300
# 1-hour candles
assembler_1h = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1h",
)
assert assembler_1h._timeframe_seconds == 3600
def test_period_start_calculation(self):
"""Test that period start is calculated correctly."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="5m",
)
# Tick at 10:03:45 should start candle at 10:00:00
timestamp = 1709337600 + 225 # 3 minutes 45 seconds into period
assembler.process_tick(50000.0, timestamp=timestamp)
# Candle should start at period boundary
expected_start = 1709337600 # 10:00:00
assert assembler.current_candle.time == expected_start
def test_skip_period_creates_gap(self):
"""Test that skipping a period creates a gap (returns completed candle)."""
assembler = CandleAssembler(
exchange="binance",
symbol="BTC/USDT",
timeframe="1m",
)
base_time = 1709337600
# First minute
assembler.process_tick(50000.0, timestamp=base_time)
# Skip to third minute
completed = assembler.process_tick(50500.0, timestamp=base_time + 120)
# Should complete first candle
assert completed is not None
assert completed.time == base_time
# New candle should be in third minute
assert assembler.current_candle.time == base_time + 120