exchange-data-manager/tests/integration/test_real_exchange.py

1106 lines
37 KiB
Python

"""
Integration tests with real exchange data.
These tests fetch actual data from exchanges to verify the three-tier
caching system handles all edge cases correctly:
- Gaps at the beginning of requested range
- Gaps at the end of requested range
- Gaps in the middle of cached data
- Multiple discontinuous gaps
- Freshness checks for stale data
Run with: pytest tests/integration/test_real_exchange.py -v -s
Skip in CI with: pytest -m "not real_exchange"
"""
import pytest
import asyncio
import tempfile
import os
import time
from typing import List, Tuple
from exchange_data_manager.cache.manager import CacheManager
from exchange_data_manager.cache.memory import MemoryCache
from exchange_data_manager.cache.database import DatabaseCache
from exchange_data_manager.candles.models import Candle, CandleRequest
from exchange_data_manager.config import CacheConfig, DatabaseConfig
from exchange_data_manager.exchanges.ccxt_connector import CCXTConnector
# Mark all tests in this module as requiring real exchange access
pytestmark = pytest.mark.real_exchange
@pytest.fixture
def timeframe():
"""Use 1m timeframe for faster tests."""
return "1m"
@pytest.fixture
def symbol():
"""Use BTC/USDT as it's available on most exchanges."""
return "BTC/USDT"
@pytest.fixture
def exchange_name():
"""Use Binance as it's reliable and has good rate limits."""
return "binance"
@pytest.fixture
async def connector(exchange_name):
"""Create a real CCXT connector."""
conn = CCXTConnector(exchange_name)
yield conn
await conn.close()
@pytest.fixture
def cache_manager_factory():
"""Factory to create cache managers with temp databases."""
managers = []
temp_dirs = []
def _create(memory_ttl: int = 3600):
tmpdir = tempfile.mkdtemp()
temp_dirs.append(tmpdir)
db_path = os.path.join(tmpdir, "test.db")
cache_config = CacheConfig(
fill_gaps=True,
forward_fill_volume=True,
time_tolerance_seconds=5,
count_tolerance=1,
)
db_config = DatabaseConfig(path=db_path)
# Create memory cache with specified TTL
memory_cache = MemoryCache(ttl_seconds=memory_ttl)
manager = CacheManager(
cache_config=cache_config,
database_config=db_config,
memory_cache=memory_cache,
use_async_db=False,
)
managers.append(manager)
return manager
yield _create
# Cleanup
for tmpdir in temp_dirs:
import shutil
shutil.rmtree(tmpdir, ignore_errors=True)
class TestRealExchangeBasics:
"""Basic tests to verify exchange connectivity."""
@pytest.mark.asyncio
async def test_connector_can_fetch_candles(self, connector, symbol, timeframe):
"""Verify we can fetch candles from the exchange."""
now = int(time.time())
one_hour_ago = now - 3600
candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=one_hour_ago,
limit=10,
)
assert len(candles) > 0
assert all(isinstance(c, Candle) for c in candles)
assert all(c.time >= one_hour_ago for c in candles)
print(f"\nFetched {len(candles)} candles from {connector.exchange_id}")
print(f"Time range: {candles[0].time} to {candles[-1].time}")
@pytest.mark.asyncio
async def test_candles_are_sorted_and_continuous(self, connector, symbol, timeframe):
"""Verify fetched candles are sorted and reasonably continuous."""
now = int(time.time())
one_hour_ago = now - 3600
candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=one_hour_ago,
limit=30,
)
# Check sorted
times = [c.time for c in candles]
assert times == sorted(times), "Candles should be sorted by time"
# Check continuous (allowing for some gaps due to low volume periods)
interval = 60 # 1m = 60 seconds
gaps = []
for i in range(1, len(candles)):
diff = candles[i].time - candles[i-1].time
if diff > interval:
gaps.append((candles[i-1].time, candles[i].time, diff // interval))
print(f"\nFound {len(gaps)} gaps in {len(candles)} candles")
for start, end, missing in gaps:
print(f" Gap: {missing} candles missing between {start} and {end}")
class TestThreeTierCaching:
"""Test the three-tier caching system with real data."""
@pytest.mark.asyncio
async def test_cold_cache_fetches_from_exchange(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Cold cache should fetch from exchange and populate all tiers."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
limit=50,
)
# First request - cold cache
result, source = await manager.get_candles_with_source(request)
assert len(result) > 0
assert source == "exchange"
print(f"\nCold cache: fetched {len(result)} candles from {source}")
# Verify data is now in memory
cache_key = f"{exchange_name}:{symbol}:{timeframe}"
memory_candles, _ = manager.memory.get(cache_key, limit=50)
assert len(memory_candles) == len(result)
print(f"Memory cache now has {len(memory_candles)} candles")
# Verify data is also in database
db_candles, _ = manager.database.get(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
limit=50,
)
assert len(db_candles) == len(result)
print(f"Database cache now has {len(db_candles)} candles")
@pytest.mark.asyncio
async def test_warm_memory_serves_from_memory(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Warm memory cache should serve without hitting exchange."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
limit=30,
)
# First request - populates cache
result1, source1 = await manager.get_candles_with_source(request)
assert source1 == "exchange"
# Small delay to ensure different request
await asyncio.sleep(0.1)
# Second request - should use memory
result2, source2 = await manager.get_candles_with_source(request)
assert source2 == "memory"
assert len(result2) == len(result1)
print(f"\nWarm cache: served {len(result2)} candles from {source2}")
@pytest.mark.asyncio
async def test_database_fallback_when_memory_cleared(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Should fall back to database when memory is cleared."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
# Use range request (not limit-only) to avoid freshness check
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
start = aligned - 30 * interval
end = aligned - 5 * interval # End slightly in past to avoid edge effects
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
# First request - populates both caches
result1, _ = await manager.get_candles_with_source(request)
print(f"\nInitial fetch: {len(result1)} candles")
# Clear memory cache
manager.memory.clear()
print(f"Cleared memory cache")
# Request again - should use database
result2, source2 = await manager.get_candles_with_source(request)
assert source2 == "database"
assert len(result2) == len(result1)
print(f"Database fallback: served {len(result2)} candles from {source2}")
class TestGapHandling:
"""Test gap detection and filling with real data."""
@pytest.mark.asyncio
async def test_gap_at_end_fetches_newer_data(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Request extending beyond cached data should fetch the gap."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60 # 1m
# Align to candle boundary
aligned_now = (now // interval) * interval
# First: fetch candles from 30 minutes ago to 15 minutes ago
old_start = aligned_now - (30 * interval)
old_end = aligned_now - (15 * interval)
request1 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=old_start,
end=old_end,
)
result1, source1 = await manager.get_candles_with_source(request1)
print(f"\nInitial fetch: {len(result1)} candles from {source1}")
print(f"Range: {old_start} to {old_end}")
# Now request a range that extends to now (creating end gap)
request2 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=old_start,
end=aligned_now,
)
result2, source2 = await manager.get_candles_with_source(request2)
print(f"Extended fetch: {len(result2)} candles from {source2}")
print(f"Range: {old_start} to {aligned_now}")
# Should have more candles now (the gap was filled)
assert len(result2) >= len(result1)
# Verify the range is covered
if result2:
assert result2[0].time <= old_start + interval
assert result2[-1].time >= old_end
@pytest.mark.asyncio
async def test_gap_at_start_fetches_older_data(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Request extending before cached data should fetch the gap."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60 # 1m
aligned_now = (now // interval) * interval
# First: fetch recent candles (last 15 minutes)
recent_start = aligned_now - (15 * interval)
request1 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=recent_start,
end=aligned_now,
)
result1, source1 = await manager.get_candles_with_source(request1)
print(f"\nInitial fetch: {len(result1)} candles from {source1}")
print(f"Range: {recent_start} to {aligned_now}")
# Now request a range that extends further back (creating start gap)
older_start = aligned_now - (45 * interval)
request2 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=older_start,
end=aligned_now,
)
result2, source2 = await manager.get_candles_with_source(request2)
print(f"Extended fetch: {len(result2)} candles from {source2}")
print(f"Range: {older_start} to {aligned_now}")
# Should have more candles now
assert len(result2) >= len(result1)
# Verify older data is included
if result2:
assert result2[0].time <= older_start + interval
@pytest.mark.asyncio
async def test_gap_in_middle_gets_filled(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Gap in the middle of cached data should be detected and filled."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60 # 1m
aligned_now = (now // interval) * interval
# Fetch two separate ranges, creating a gap in the middle
# Range 1: 60-45 minutes ago
range1_start = aligned_now - (60 * interval)
range1_end = aligned_now - (45 * interval)
request1 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=range1_start,
end=range1_end,
)
result1, _ = await manager.get_candles_with_source(request1)
print(f"\nRange 1: {len(result1)} candles ({range1_start} to {range1_end})")
# Range 2: 30-15 minutes ago (gap of 15 minutes between ranges)
range2_start = aligned_now - (30 * interval)
range2_end = aligned_now - (15 * interval)
request2 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=range2_start,
end=range2_end,
)
result2, _ = await manager.get_candles_with_source(request2)
print(f"Range 2: {len(result2)} candles ({range2_start} to {range2_end})")
# Now request the full range - should fill the gap
full_request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=range1_start,
end=range2_end,
)
full_result, source = await manager.get_candles_with_source(full_request)
print(f"Full range: {len(full_result)} candles from {source}")
print(f"Range: {range1_start} to {range2_end}")
# Should have more candles than sum of individual ranges (gap filled)
expected_min = len(result1) + len(result2)
print(f"Expected at least {expected_min} candles, got {len(full_result)}")
# Verify continuity
if len(full_result) > 1:
for i in range(1, len(full_result)):
gap = full_result[i].time - full_result[i-1].time
# Allow up to 2 intervals gap (market might have low volume periods)
assert gap <= interval * 2, f"Unexpected gap at index {i}: {gap}s"
@pytest.mark.asyncio
async def test_multiple_gaps_all_filled(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Multiple gaps should all be detected and filled."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60 # 1m
aligned_now = (now // interval) * interval
# Create three separate cached ranges with gaps between them
ranges = [
(aligned_now - 90 * interval, aligned_now - 80 * interval), # 90-80 min ago
(aligned_now - 60 * interval, aligned_now - 50 * interval), # 60-50 min ago
(aligned_now - 30 * interval, aligned_now - 20 * interval), # 30-20 min ago
]
total_initial = 0
for i, (start, end) in enumerate(ranges):
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
result, _ = await manager.get_candles_with_source(request)
total_initial += len(result)
print(f"\nRange {i+1}: {len(result)} candles ({start} to {end})")
print(f"Total initial candles: {total_initial}")
# Request full range spanning all gaps
full_start = ranges[0][0]
full_end = ranges[-1][1]
full_request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=full_start,
end=full_end,
)
full_result, source = await manager.get_candles_with_source(full_request)
print(f"Full range: {len(full_result)} candles from {source}")
# Should have significantly more candles (gaps filled)
assert len(full_result) > total_initial
print(f"Gap filling added {len(full_result) - total_initial} candles")
class TestFreshnessAndStaleness:
"""Test freshness checking for limit-only requests."""
@pytest.mark.asyncio
async def test_stale_data_triggers_refresh(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Stale cached data should trigger a refresh from exchange."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60 # 1m
# Manually insert old data into memory cache (simulating stale data)
old_time = now - (10 * interval) # 10 minutes ago
old_candles = [
Candle(
time=old_time - (i * interval),
open=50000.0,
high=50100.0,
low=49900.0,
close=50050.0,
volume=10.0,
)
for i in range(5)
]
cache_key = f"{exchange_name}:{symbol}:{timeframe}"
manager.memory.put(cache_key, old_candles)
print(f"\nInserted {len(old_candles)} stale candles (oldest: {old_time - 4*interval})")
# Request latest candles - should detect staleness and fetch fresh
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
limit=10,
)
result, source = await manager.get_candles_with_source(request)
print(f"Got {len(result)} candles from {source}")
# Should have fetched fresh data (not just returned stale memory data)
# The most recent candle should be close to now
if result:
most_recent = result[-1].time
staleness = now - most_recent
print(f"Most recent candle: {most_recent} ({staleness}s ago)")
# Should be within 2-3 minutes of now (allowing for exchange delay)
assert staleness < 5 * interval, f"Data still stale: {staleness}s old"
@pytest.mark.asyncio
async def test_fresh_data_served_from_cache(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Fresh cached data should be served without exchange call."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
# First request to populate cache with fresh data
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
limit=20,
)
result1, source1 = await manager.get_candles_with_source(request)
print(f"\nInitial: {len(result1)} candles from {source1}")
# Immediate second request - should use memory
result2, source2 = await manager.get_candles_with_source(request)
print(f"Second: {len(result2)} candles from {source2}")
assert source2 == "memory"
assert len(result2) == len(result1)
class TestEdgeCases:
"""Test edge cases and boundary conditions."""
@pytest.mark.asyncio
async def test_single_candle_request(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Single candle request should work correctly."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
limit=1,
)
result, source = await manager.get_candles_with_source(request)
assert len(result) == 1
print(f"\nSingle candle: {result[0].time} from {source}")
@pytest.mark.asyncio
async def test_large_range_request(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Large range request should handle pagination correctly."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
# Request 2 hours of data (120 candles for 1m)
start = now - (120 * interval)
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=now,
)
result, source = await manager.get_candles_with_source(request)
print(f"\nLarge range: {len(result)} candles from {source}")
print(f"Expected ~120 candles, got {len(result)}")
# Should have most of the expected candles (some gaps possible)
assert len(result) >= 100 # Allow for some gaps
@pytest.mark.asyncio
async def test_exact_boundary_alignment(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Requests with exact candle boundaries should work correctly."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
# Align to exact candle boundary
aligned = (now // interval) * interval
start = aligned - (10 * interval)
end = aligned
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
result, _ = await manager.get_candles_with_source(request)
print(f"\nBoundary aligned: {len(result)} candles")
print(f"Start: {start}, End: {end}")
if result:
# First candle should be at or after start
assert result[0].time >= start
# Last candle should be at or before end
assert result[-1].time <= end
@pytest.mark.asyncio
async def test_overlapping_requests_merge_correctly(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Overlapping requests should merge data correctly without duplicates."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
# Request 1: 30-10 minutes ago
request1 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=aligned - 30 * interval,
end=aligned - 10 * interval,
)
result1, _ = await manager.get_candles_with_source(request1)
print(f"\nRequest 1: {len(result1)} candles")
# Request 2: 20-5 minutes ago (overlaps with request 1)
request2 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=aligned - 20 * interval,
end=aligned - 5 * interval,
)
result2, _ = await manager.get_candles_with_source(request2)
print(f"Request 2: {len(result2)} candles")
# Request full range - should have no duplicates
full_request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=aligned - 30 * interval,
end=aligned - 5 * interval,
)
full_result, source = await manager.get_candles_with_source(full_request)
print(f"Full range: {len(full_result)} candles from {source}")
# Check for duplicates
times = [c.time for c in full_result]
assert len(times) == len(set(times)), "Found duplicate candles!"
# Should be sorted
assert times == sorted(times), "Candles not sorted!"
class TestDataIntegrity:
"""
Verify data integrity by comparing cached results against fresh exchange data.
These tests fetch data through the cache system, then fetch the same range
directly from the exchange to verify no corruption occurred during caching,
merging, or gap filling.
"""
@pytest.mark.asyncio
async def test_cached_data_matches_exchange_data(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Cached data should exactly match direct exchange fetch."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
# Define a fixed range
start = aligned - (30 * interval)
end = aligned - (5 * interval)
# Fetch through cache system
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
cached_result, _ = await manager.get_candles_with_source(request)
print(f"\nCached fetch: {len(cached_result)} candles")
# Fetch directly from exchange (bypass cache)
direct_candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=start,
limit=500,
)
# Filter to our exact range
direct_result = [
c for c in direct_candles
if start <= c.time <= end
]
print(f"Direct fetch: {len(direct_result)} candles")
# Compare candle by candle
self._verify_candles_match(cached_result, direct_result)
@pytest.mark.asyncio
async def test_gap_filled_data_matches_exchange(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Data after gap filling should match fresh exchange fetch."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
# Create two separate cached ranges with a gap
range1_start = aligned - (50 * interval)
range1_end = aligned - (40 * interval)
range2_start = aligned - (25 * interval)
range2_end = aligned - (15 * interval)
# Fetch first range
request1 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=range1_start,
end=range1_end,
)
await manager.get_candles_with_source(request1)
print(f"\nFetched range 1: {range1_start} to {range1_end}")
# Fetch second range
request2 = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=range2_start,
end=range2_end,
)
await manager.get_candles_with_source(request2)
print(f"Fetched range 2: {range2_start} to {range2_end}")
# Now fetch the full range (includes gap filling)
full_start = range1_start
full_end = range2_end
full_request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=full_start,
end=full_end,
)
cached_result, _ = await manager.get_candles_with_source(full_request)
print(f"Gap-filled result: {len(cached_result)} candles")
# Fetch the same range directly from exchange
direct_candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=full_start,
limit=500,
)
direct_result = [
c for c in direct_candles
if full_start <= c.time <= full_end
]
print(f"Direct fetch: {len(direct_result)} candles")
# Compare
self._verify_candles_match(cached_result, direct_result)
@pytest.mark.asyncio
async def test_multiple_gap_fills_preserve_data_integrity(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Multiple gap fill operations should not corrupt data."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
# Create multiple small cached ranges
ranges = [
(aligned - 80 * interval, aligned - 75 * interval),
(aligned - 60 * interval, aligned - 55 * interval),
(aligned - 40 * interval, aligned - 35 * interval),
(aligned - 20 * interval, aligned - 15 * interval),
]
for i, (start, end) in enumerate(ranges):
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
await manager.get_candles_with_source(request)
print(f"\nFetched range {i+1}: {start} to {end}")
# Fetch full range through cache (many gaps to fill)
full_start = ranges[0][0]
full_end = ranges[-1][1]
full_request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=full_start,
end=full_end,
)
cached_result, _ = await manager.get_candles_with_source(full_request)
print(f"Multi-gap filled result: {len(cached_result)} candles")
# Fetch directly from exchange
direct_candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=full_start,
limit=500,
)
direct_result = [
c for c in direct_candles
if full_start <= c.time <= full_end
]
print(f"Direct fetch: {len(direct_result)} candles")
# Compare
self._verify_candles_match(cached_result, direct_result)
@pytest.mark.asyncio
async def test_overlapping_caches_preserve_integrity(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Overlapping cache operations should not corrupt data."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
# Create overlapping ranges
ranges = [
(aligned - 40 * interval, aligned - 20 * interval), # 40-20
(aligned - 30 * interval, aligned - 10 * interval), # 30-10 (overlaps)
(aligned - 25 * interval, aligned - 5 * interval), # 25-5 (overlaps more)
]
for i, (start, end) in enumerate(ranges):
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
await manager.get_candles_with_source(request)
print(f"\nFetched overlapping range {i+1}: {start} to {end}")
# Fetch the combined range
full_start = aligned - 40 * interval
full_end = aligned - 5 * interval
full_request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=full_start,
end=full_end,
)
cached_result, _ = await manager.get_candles_with_source(full_request)
print(f"Merged result: {len(cached_result)} candles")
# Fetch directly
direct_candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=full_start,
limit=500,
)
direct_result = [
c for c in direct_candles
if full_start <= c.time <= full_end
]
print(f"Direct fetch: {len(direct_result)} candles")
# Compare
self._verify_candles_match(cached_result, direct_result)
@pytest.mark.asyncio
async def test_database_cached_data_matches_exchange(
self, cache_manager_factory, connector, exchange_name, symbol, timeframe
):
"""Data retrieved from database tier should match exchange."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
interval = 60
aligned = (now // interval) * interval
start = aligned - 30 * interval
end = aligned - 10 * interval
# Populate cache
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
)
await manager.get_candles_with_source(request)
# Clear memory, force database read
manager.memory.clear()
# Fetch again (should come from database)
db_result, source = await manager.get_candles_with_source(request)
assert source == "database"
print(f"\nDatabase fetch: {len(db_result)} candles")
# Compare with direct exchange fetch
direct_candles = await connector.fetch_candles(
symbol=symbol,
timeframe=timeframe,
start=start,
limit=500,
)
direct_result = [
c for c in direct_candles
if start <= c.time <= end
]
print(f"Direct fetch: {len(direct_result)} candles")
self._verify_candles_match(db_result, direct_result)
def _verify_candles_match(
self,
cached: List[Candle],
direct: List[Candle],
tolerance: float = 0.0001,
):
"""
Verify cached candles match direct fetch.
Args:
cached: Candles from cache system
direct: Candles fetched directly from exchange
tolerance: Floating point comparison tolerance
"""
# Build lookup by time
direct_by_time = {c.time: c for c in direct}
cached_by_time = {c.time: c for c in cached}
# Find common timestamps
common_times = set(direct_by_time.keys()) & set(cached_by_time.keys())
print(f"Comparing {len(common_times)} common candles")
if not common_times:
print("WARNING: No common candles to compare")
return
mismatches = []
for t in sorted(common_times):
d = direct_by_time[t]
c = cached_by_time[t]
# Compare OHLCV values
if abs(d.open - c.open) > tolerance:
mismatches.append(f"time={t}: open mismatch {d.open} vs {c.open}")
if abs(d.high - c.high) > tolerance:
mismatches.append(f"time={t}: high mismatch {d.high} vs {c.high}")
if abs(d.low - c.low) > tolerance:
mismatches.append(f"time={t}: low mismatch {d.low} vs {c.low}")
if abs(d.close - c.close) > tolerance:
mismatches.append(f"time={t}: close mismatch {d.close} vs {c.close}")
if abs(d.volume - c.volume) > tolerance:
mismatches.append(f"time={t}: volume mismatch {d.volume} vs {c.volume}")
if mismatches:
print(f"CORRUPTION DETECTED! {len(mismatches)} mismatches:")
for m in mismatches[:10]: # Show first 10
print(f" {m}")
pytest.fail(f"Data corruption: {len(mismatches)} mismatches found")
else:
print("All candle values match - no corruption detected")
# Check for missing candles in cache
missing_in_cache = set(direct_by_time.keys()) - set(cached_by_time.keys())
if missing_in_cache:
print(f"Note: {len(missing_in_cache)} candles in direct but not in cache")
# Check for extra candles in cache (could be gap-filled forward-fill candles)
extra_in_cache = set(cached_by_time.keys()) - set(direct_by_time.keys())
if extra_in_cache:
print(f"Note: {len(extra_in_cache)} extra candles in cache (possibly gap-filled)")
class TestDifferentTimeframes:
"""Test with different timeframes."""
@pytest.mark.asyncio
@pytest.mark.parametrize("tf,interval", [
("1m", 60),
("5m", 300),
("15m", 900),
("1h", 3600),
])
async def test_various_timeframes(
self, cache_manager_factory, connector, exchange_name, symbol, tf, interval
):
"""Test caching works correctly for various timeframes."""
manager = cache_manager_factory()
manager.register_exchange(exchange_name, connector)
now = int(time.time())
start = now - (10 * interval)
request = CandleRequest(
exchange=exchange_name,
symbol=symbol,
timeframe=tf,
start=start,
end=now,
)
result, source = await manager.get_candles_with_source(request)
print(f"\n{tf}: {len(result)} candles from {source}")
if len(result) > 1:
# Verify spacing matches timeframe
actual_intervals = [
result[i].time - result[i-1].time
for i in range(1, len(result))
]
# Most intervals should match expected (allow some gaps)
matching = sum(1 for i in actual_intervals if i == interval)
match_rate = matching / len(actual_intervals)
print(f"Interval match rate: {match_rate:.1%}")
assert match_rate > 0.8, f"Too many interval mismatches for {tf}"