diff --git a/pyproject.toml b/pyproject.toml index 6beda6c..a66a87f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,3 +36,6 @@ where = ["src"] [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" +markers = [ + "real_exchange: tests that require real exchange connectivity (deselect with '-m \"not real_exchange\"')", +] diff --git a/tests/integration/test_real_exchange.py b/tests/integration/test_real_exchange.py new file mode 100644 index 0000000..c8b45df --- /dev/null +++ b/tests/integration/test_real_exchange.py @@ -0,0 +1,1105 @@ +""" +Integration tests with real exchange data. + +These tests fetch actual data from exchanges to verify the three-tier +caching system handles all edge cases correctly: +- Gaps at the beginning of requested range +- Gaps at the end of requested range +- Gaps in the middle of cached data +- Multiple discontinuous gaps +- Freshness checks for stale data + +Run with: pytest tests/integration/test_real_exchange.py -v -s +Skip in CI with: pytest -m "not real_exchange" +""" + +import pytest +import asyncio +import tempfile +import os +import time +from typing import List, Tuple + +from exchange_data_manager.cache.manager import CacheManager +from exchange_data_manager.cache.memory import MemoryCache +from exchange_data_manager.cache.database import DatabaseCache +from exchange_data_manager.candles.models import Candle, CandleRequest +from exchange_data_manager.config import CacheConfig, DatabaseConfig +from exchange_data_manager.exchanges.ccxt_connector import CCXTConnector + + +# Mark all tests in this module as requiring real exchange access +pytestmark = pytest.mark.real_exchange + + +@pytest.fixture +def timeframe(): + """Use 1m timeframe for faster tests.""" + return "1m" + + +@pytest.fixture +def symbol(): + """Use BTC/USDT as it's available on most exchanges.""" + return "BTC/USDT" + + +@pytest.fixture +def exchange_name(): + """Use Binance as it's reliable and has good rate limits.""" + return "binance" + + +@pytest.fixture +async def connector(exchange_name): + """Create a real CCXT connector.""" + conn = CCXTConnector(exchange_name) + yield conn + await conn.close() + + +@pytest.fixture +def cache_manager_factory(): + """Factory to create cache managers with temp databases.""" + managers = [] + temp_dirs = [] + + def _create(memory_ttl: int = 3600): + tmpdir = tempfile.mkdtemp() + temp_dirs.append(tmpdir) + db_path = os.path.join(tmpdir, "test.db") + + cache_config = CacheConfig( + fill_gaps=True, + forward_fill_volume=True, + time_tolerance_seconds=5, + count_tolerance=1, + ) + db_config = DatabaseConfig(path=db_path) + + # Create memory cache with specified TTL + memory_cache = MemoryCache(ttl_seconds=memory_ttl) + + manager = CacheManager( + cache_config=cache_config, + database_config=db_config, + memory_cache=memory_cache, + use_async_db=False, + ) + managers.append(manager) + return manager + + yield _create + + # Cleanup + for tmpdir in temp_dirs: + import shutil + shutil.rmtree(tmpdir, ignore_errors=True) + + +class TestRealExchangeBasics: + """Basic tests to verify exchange connectivity.""" + + @pytest.mark.asyncio + async def test_connector_can_fetch_candles(self, connector, symbol, timeframe): + """Verify we can fetch candles from the exchange.""" + now = int(time.time()) + one_hour_ago = now - 3600 + + candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=one_hour_ago, + limit=10, + ) + + assert len(candles) > 0 + assert all(isinstance(c, Candle) for c in candles) + assert all(c.time >= one_hour_ago for c in candles) + print(f"\nFetched {len(candles)} candles from {connector.exchange_id}") + print(f"Time range: {candles[0].time} to {candles[-1].time}") + + @pytest.mark.asyncio + async def test_candles_are_sorted_and_continuous(self, connector, symbol, timeframe): + """Verify fetched candles are sorted and reasonably continuous.""" + now = int(time.time()) + one_hour_ago = now - 3600 + + candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=one_hour_ago, + limit=30, + ) + + # Check sorted + times = [c.time for c in candles] + assert times == sorted(times), "Candles should be sorted by time" + + # Check continuous (allowing for some gaps due to low volume periods) + interval = 60 # 1m = 60 seconds + gaps = [] + for i in range(1, len(candles)): + diff = candles[i].time - candles[i-1].time + if diff > interval: + gaps.append((candles[i-1].time, candles[i].time, diff // interval)) + + print(f"\nFound {len(gaps)} gaps in {len(candles)} candles") + for start, end, missing in gaps: + print(f" Gap: {missing} candles missing between {start} and {end}") + + +class TestThreeTierCaching: + """Test the three-tier caching system with real data.""" + + @pytest.mark.asyncio + async def test_cold_cache_fetches_from_exchange( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Cold cache should fetch from exchange and populate all tiers.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + limit=50, + ) + + # First request - cold cache + result, source = await manager.get_candles_with_source(request) + + assert len(result) > 0 + assert source == "exchange" + print(f"\nCold cache: fetched {len(result)} candles from {source}") + + # Verify data is now in memory + cache_key = f"{exchange_name}:{symbol}:{timeframe}" + memory_candles, _ = manager.memory.get(cache_key, limit=50) + assert len(memory_candles) == len(result) + print(f"Memory cache now has {len(memory_candles)} candles") + + # Verify data is also in database + db_candles, _ = manager.database.get( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + limit=50, + ) + assert len(db_candles) == len(result) + print(f"Database cache now has {len(db_candles)} candles") + + @pytest.mark.asyncio + async def test_warm_memory_serves_from_memory( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Warm memory cache should serve without hitting exchange.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + limit=30, + ) + + # First request - populates cache + result1, source1 = await manager.get_candles_with_source(request) + assert source1 == "exchange" + + # Small delay to ensure different request + await asyncio.sleep(0.1) + + # Second request - should use memory + result2, source2 = await manager.get_candles_with_source(request) + + assert source2 == "memory" + assert len(result2) == len(result1) + print(f"\nWarm cache: served {len(result2)} candles from {source2}") + + @pytest.mark.asyncio + async def test_database_fallback_when_memory_cleared( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Should fall back to database when memory is cleared.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + # Use range request (not limit-only) to avoid freshness check + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + start = aligned - 30 * interval + end = aligned - 5 * interval # End slightly in past to avoid edge effects + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + + # First request - populates both caches + result1, _ = await manager.get_candles_with_source(request) + print(f"\nInitial fetch: {len(result1)} candles") + + # Clear memory cache + manager.memory.clear() + print(f"Cleared memory cache") + + # Request again - should use database + result2, source2 = await manager.get_candles_with_source(request) + + assert source2 == "database" + assert len(result2) == len(result1) + print(f"Database fallback: served {len(result2)} candles from {source2}") + + +class TestGapHandling: + """Test gap detection and filling with real data.""" + + @pytest.mark.asyncio + async def test_gap_at_end_fetches_newer_data( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Request extending beyond cached data should fetch the gap.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 # 1m + + # Align to candle boundary + aligned_now = (now // interval) * interval + + # First: fetch candles from 30 minutes ago to 15 minutes ago + old_start = aligned_now - (30 * interval) + old_end = aligned_now - (15 * interval) + + request1 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=old_start, + end=old_end, + ) + + result1, source1 = await manager.get_candles_with_source(request1) + print(f"\nInitial fetch: {len(result1)} candles from {source1}") + print(f"Range: {old_start} to {old_end}") + + # Now request a range that extends to now (creating end gap) + request2 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=old_start, + end=aligned_now, + ) + + result2, source2 = await manager.get_candles_with_source(request2) + print(f"Extended fetch: {len(result2)} candles from {source2}") + print(f"Range: {old_start} to {aligned_now}") + + # Should have more candles now (the gap was filled) + assert len(result2) >= len(result1) + + # Verify the range is covered + if result2: + assert result2[0].time <= old_start + interval + assert result2[-1].time >= old_end + + @pytest.mark.asyncio + async def test_gap_at_start_fetches_older_data( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Request extending before cached data should fetch the gap.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 # 1m + aligned_now = (now // interval) * interval + + # First: fetch recent candles (last 15 minutes) + recent_start = aligned_now - (15 * interval) + + request1 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=recent_start, + end=aligned_now, + ) + + result1, source1 = await manager.get_candles_with_source(request1) + print(f"\nInitial fetch: {len(result1)} candles from {source1}") + print(f"Range: {recent_start} to {aligned_now}") + + # Now request a range that extends further back (creating start gap) + older_start = aligned_now - (45 * interval) + + request2 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=older_start, + end=aligned_now, + ) + + result2, source2 = await manager.get_candles_with_source(request2) + print(f"Extended fetch: {len(result2)} candles from {source2}") + print(f"Range: {older_start} to {aligned_now}") + + # Should have more candles now + assert len(result2) >= len(result1) + + # Verify older data is included + if result2: + assert result2[0].time <= older_start + interval + + @pytest.mark.asyncio + async def test_gap_in_middle_gets_filled( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Gap in the middle of cached data should be detected and filled.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 # 1m + aligned_now = (now // interval) * interval + + # Fetch two separate ranges, creating a gap in the middle + # Range 1: 60-45 minutes ago + range1_start = aligned_now - (60 * interval) + range1_end = aligned_now - (45 * interval) + + request1 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=range1_start, + end=range1_end, + ) + + result1, _ = await manager.get_candles_with_source(request1) + print(f"\nRange 1: {len(result1)} candles ({range1_start} to {range1_end})") + + # Range 2: 30-15 minutes ago (gap of 15 minutes between ranges) + range2_start = aligned_now - (30 * interval) + range2_end = aligned_now - (15 * interval) + + request2 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=range2_start, + end=range2_end, + ) + + result2, _ = await manager.get_candles_with_source(request2) + print(f"Range 2: {len(result2)} candles ({range2_start} to {range2_end})") + + # Now request the full range - should fill the gap + full_request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=range1_start, + end=range2_end, + ) + + full_result, source = await manager.get_candles_with_source(full_request) + print(f"Full range: {len(full_result)} candles from {source}") + print(f"Range: {range1_start} to {range2_end}") + + # Should have more candles than sum of individual ranges (gap filled) + expected_min = len(result1) + len(result2) + print(f"Expected at least {expected_min} candles, got {len(full_result)}") + + # Verify continuity + if len(full_result) > 1: + for i in range(1, len(full_result)): + gap = full_result[i].time - full_result[i-1].time + # Allow up to 2 intervals gap (market might have low volume periods) + assert gap <= interval * 2, f"Unexpected gap at index {i}: {gap}s" + + @pytest.mark.asyncio + async def test_multiple_gaps_all_filled( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Multiple gaps should all be detected and filled.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 # 1m + aligned_now = (now // interval) * interval + + # Create three separate cached ranges with gaps between them + ranges = [ + (aligned_now - 90 * interval, aligned_now - 80 * interval), # 90-80 min ago + (aligned_now - 60 * interval, aligned_now - 50 * interval), # 60-50 min ago + (aligned_now - 30 * interval, aligned_now - 20 * interval), # 30-20 min ago + ] + + total_initial = 0 + for i, (start, end) in enumerate(ranges): + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + result, _ = await manager.get_candles_with_source(request) + total_initial += len(result) + print(f"\nRange {i+1}: {len(result)} candles ({start} to {end})") + + print(f"Total initial candles: {total_initial}") + + # Request full range spanning all gaps + full_start = ranges[0][0] + full_end = ranges[-1][1] + + full_request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=full_start, + end=full_end, + ) + + full_result, source = await manager.get_candles_with_source(full_request) + print(f"Full range: {len(full_result)} candles from {source}") + + # Should have significantly more candles (gaps filled) + assert len(full_result) > total_initial + print(f"Gap filling added {len(full_result) - total_initial} candles") + + +class TestFreshnessAndStaleness: + """Test freshness checking for limit-only requests.""" + + @pytest.mark.asyncio + async def test_stale_data_triggers_refresh( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Stale cached data should trigger a refresh from exchange.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 # 1m + + # Manually insert old data into memory cache (simulating stale data) + old_time = now - (10 * interval) # 10 minutes ago + old_candles = [ + Candle( + time=old_time - (i * interval), + open=50000.0, + high=50100.0, + low=49900.0, + close=50050.0, + volume=10.0, + ) + for i in range(5) + ] + + cache_key = f"{exchange_name}:{symbol}:{timeframe}" + manager.memory.put(cache_key, old_candles) + print(f"\nInserted {len(old_candles)} stale candles (oldest: {old_time - 4*interval})") + + # Request latest candles - should detect staleness and fetch fresh + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + limit=10, + ) + + result, source = await manager.get_candles_with_source(request) + print(f"Got {len(result)} candles from {source}") + + # Should have fetched fresh data (not just returned stale memory data) + # The most recent candle should be close to now + if result: + most_recent = result[-1].time + staleness = now - most_recent + print(f"Most recent candle: {most_recent} ({staleness}s ago)") + + # Should be within 2-3 minutes of now (allowing for exchange delay) + assert staleness < 5 * interval, f"Data still stale: {staleness}s old" + + @pytest.mark.asyncio + async def test_fresh_data_served_from_cache( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Fresh cached data should be served without exchange call.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + # First request to populate cache with fresh data + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + limit=20, + ) + + result1, source1 = await manager.get_candles_with_source(request) + print(f"\nInitial: {len(result1)} candles from {source1}") + + # Immediate second request - should use memory + result2, source2 = await manager.get_candles_with_source(request) + print(f"Second: {len(result2)} candles from {source2}") + + assert source2 == "memory" + assert len(result2) == len(result1) + + +class TestEdgeCases: + """Test edge cases and boundary conditions.""" + + @pytest.mark.asyncio + async def test_single_candle_request( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Single candle request should work correctly.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + limit=1, + ) + + result, source = await manager.get_candles_with_source(request) + + assert len(result) == 1 + print(f"\nSingle candle: {result[0].time} from {source}") + + @pytest.mark.asyncio + async def test_large_range_request( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Large range request should handle pagination correctly.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + + # Request 2 hours of data (120 candles for 1m) + start = now - (120 * interval) + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=now, + ) + + result, source = await manager.get_candles_with_source(request) + + print(f"\nLarge range: {len(result)} candles from {source}") + print(f"Expected ~120 candles, got {len(result)}") + + # Should have most of the expected candles (some gaps possible) + assert len(result) >= 100 # Allow for some gaps + + @pytest.mark.asyncio + async def test_exact_boundary_alignment( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Requests with exact candle boundaries should work correctly.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + + # Align to exact candle boundary + aligned = (now // interval) * interval + start = aligned - (10 * interval) + end = aligned + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + + result, _ = await manager.get_candles_with_source(request) + + print(f"\nBoundary aligned: {len(result)} candles") + print(f"Start: {start}, End: {end}") + + if result: + # First candle should be at or after start + assert result[0].time >= start + # Last candle should be at or before end + assert result[-1].time <= end + + @pytest.mark.asyncio + async def test_overlapping_requests_merge_correctly( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Overlapping requests should merge data correctly without duplicates.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + + # Request 1: 30-10 minutes ago + request1 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=aligned - 30 * interval, + end=aligned - 10 * interval, + ) + + result1, _ = await manager.get_candles_with_source(request1) + print(f"\nRequest 1: {len(result1)} candles") + + # Request 2: 20-5 minutes ago (overlaps with request 1) + request2 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=aligned - 20 * interval, + end=aligned - 5 * interval, + ) + + result2, _ = await manager.get_candles_with_source(request2) + print(f"Request 2: {len(result2)} candles") + + # Request full range - should have no duplicates + full_request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=aligned - 30 * interval, + end=aligned - 5 * interval, + ) + + full_result, source = await manager.get_candles_with_source(full_request) + print(f"Full range: {len(full_result)} candles from {source}") + + # Check for duplicates + times = [c.time for c in full_result] + assert len(times) == len(set(times)), "Found duplicate candles!" + + # Should be sorted + assert times == sorted(times), "Candles not sorted!" + + +class TestDataIntegrity: + """ + Verify data integrity by comparing cached results against fresh exchange data. + + These tests fetch data through the cache system, then fetch the same range + directly from the exchange to verify no corruption occurred during caching, + merging, or gap filling. + """ + + @pytest.mark.asyncio + async def test_cached_data_matches_exchange_data( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Cached data should exactly match direct exchange fetch.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + + # Define a fixed range + start = aligned - (30 * interval) + end = aligned - (5 * interval) + + # Fetch through cache system + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + + cached_result, _ = await manager.get_candles_with_source(request) + print(f"\nCached fetch: {len(cached_result)} candles") + + # Fetch directly from exchange (bypass cache) + direct_candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=start, + limit=500, + ) + # Filter to our exact range + direct_result = [ + c for c in direct_candles + if start <= c.time <= end + ] + print(f"Direct fetch: {len(direct_result)} candles") + + # Compare candle by candle + self._verify_candles_match(cached_result, direct_result) + + @pytest.mark.asyncio + async def test_gap_filled_data_matches_exchange( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Data after gap filling should match fresh exchange fetch.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + + # Create two separate cached ranges with a gap + range1_start = aligned - (50 * interval) + range1_end = aligned - (40 * interval) + range2_start = aligned - (25 * interval) + range2_end = aligned - (15 * interval) + + # Fetch first range + request1 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=range1_start, + end=range1_end, + ) + await manager.get_candles_with_source(request1) + print(f"\nFetched range 1: {range1_start} to {range1_end}") + + # Fetch second range + request2 = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=range2_start, + end=range2_end, + ) + await manager.get_candles_with_source(request2) + print(f"Fetched range 2: {range2_start} to {range2_end}") + + # Now fetch the full range (includes gap filling) + full_start = range1_start + full_end = range2_end + + full_request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=full_start, + end=full_end, + ) + cached_result, _ = await manager.get_candles_with_source(full_request) + print(f"Gap-filled result: {len(cached_result)} candles") + + # Fetch the same range directly from exchange + direct_candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=full_start, + limit=500, + ) + direct_result = [ + c for c in direct_candles + if full_start <= c.time <= full_end + ] + print(f"Direct fetch: {len(direct_result)} candles") + + # Compare + self._verify_candles_match(cached_result, direct_result) + + @pytest.mark.asyncio + async def test_multiple_gap_fills_preserve_data_integrity( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Multiple gap fill operations should not corrupt data.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + + # Create multiple small cached ranges + ranges = [ + (aligned - 80 * interval, aligned - 75 * interval), + (aligned - 60 * interval, aligned - 55 * interval), + (aligned - 40 * interval, aligned - 35 * interval), + (aligned - 20 * interval, aligned - 15 * interval), + ] + + for i, (start, end) in enumerate(ranges): + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + await manager.get_candles_with_source(request) + print(f"\nFetched range {i+1}: {start} to {end}") + + # Fetch full range through cache (many gaps to fill) + full_start = ranges[0][0] + full_end = ranges[-1][1] + + full_request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=full_start, + end=full_end, + ) + cached_result, _ = await manager.get_candles_with_source(full_request) + print(f"Multi-gap filled result: {len(cached_result)} candles") + + # Fetch directly from exchange + direct_candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=full_start, + limit=500, + ) + direct_result = [ + c for c in direct_candles + if full_start <= c.time <= full_end + ] + print(f"Direct fetch: {len(direct_result)} candles") + + # Compare + self._verify_candles_match(cached_result, direct_result) + + @pytest.mark.asyncio + async def test_overlapping_caches_preserve_integrity( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Overlapping cache operations should not corrupt data.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + + # Create overlapping ranges + ranges = [ + (aligned - 40 * interval, aligned - 20 * interval), # 40-20 + (aligned - 30 * interval, aligned - 10 * interval), # 30-10 (overlaps) + (aligned - 25 * interval, aligned - 5 * interval), # 25-5 (overlaps more) + ] + + for i, (start, end) in enumerate(ranges): + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + await manager.get_candles_with_source(request) + print(f"\nFetched overlapping range {i+1}: {start} to {end}") + + # Fetch the combined range + full_start = aligned - 40 * interval + full_end = aligned - 5 * interval + + full_request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=full_start, + end=full_end, + ) + cached_result, _ = await manager.get_candles_with_source(full_request) + print(f"Merged result: {len(cached_result)} candles") + + # Fetch directly + direct_candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=full_start, + limit=500, + ) + direct_result = [ + c for c in direct_candles + if full_start <= c.time <= full_end + ] + print(f"Direct fetch: {len(direct_result)} candles") + + # Compare + self._verify_candles_match(cached_result, direct_result) + + @pytest.mark.asyncio + async def test_database_cached_data_matches_exchange( + self, cache_manager_factory, connector, exchange_name, symbol, timeframe + ): + """Data retrieved from database tier should match exchange.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + interval = 60 + aligned = (now // interval) * interval + start = aligned - 30 * interval + end = aligned - 10 * interval + + # Populate cache + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=timeframe, + start=start, + end=end, + ) + await manager.get_candles_with_source(request) + + # Clear memory, force database read + manager.memory.clear() + + # Fetch again (should come from database) + db_result, source = await manager.get_candles_with_source(request) + assert source == "database" + print(f"\nDatabase fetch: {len(db_result)} candles") + + # Compare with direct exchange fetch + direct_candles = await connector.fetch_candles( + symbol=symbol, + timeframe=timeframe, + start=start, + limit=500, + ) + direct_result = [ + c for c in direct_candles + if start <= c.time <= end + ] + print(f"Direct fetch: {len(direct_result)} candles") + + self._verify_candles_match(db_result, direct_result) + + def _verify_candles_match( + self, + cached: List[Candle], + direct: List[Candle], + tolerance: float = 0.0001, + ): + """ + Verify cached candles match direct fetch. + + Args: + cached: Candles from cache system + direct: Candles fetched directly from exchange + tolerance: Floating point comparison tolerance + """ + # Build lookup by time + direct_by_time = {c.time: c for c in direct} + cached_by_time = {c.time: c for c in cached} + + # Find common timestamps + common_times = set(direct_by_time.keys()) & set(cached_by_time.keys()) + print(f"Comparing {len(common_times)} common candles") + + if not common_times: + print("WARNING: No common candles to compare") + return + + mismatches = [] + for t in sorted(common_times): + d = direct_by_time[t] + c = cached_by_time[t] + + # Compare OHLCV values + if abs(d.open - c.open) > tolerance: + mismatches.append(f"time={t}: open mismatch {d.open} vs {c.open}") + if abs(d.high - c.high) > tolerance: + mismatches.append(f"time={t}: high mismatch {d.high} vs {c.high}") + if abs(d.low - c.low) > tolerance: + mismatches.append(f"time={t}: low mismatch {d.low} vs {c.low}") + if abs(d.close - c.close) > tolerance: + mismatches.append(f"time={t}: close mismatch {d.close} vs {c.close}") + if abs(d.volume - c.volume) > tolerance: + mismatches.append(f"time={t}: volume mismatch {d.volume} vs {c.volume}") + + if mismatches: + print(f"CORRUPTION DETECTED! {len(mismatches)} mismatches:") + for m in mismatches[:10]: # Show first 10 + print(f" {m}") + pytest.fail(f"Data corruption: {len(mismatches)} mismatches found") + else: + print("All candle values match - no corruption detected") + + # Check for missing candles in cache + missing_in_cache = set(direct_by_time.keys()) - set(cached_by_time.keys()) + if missing_in_cache: + print(f"Note: {len(missing_in_cache)} candles in direct but not in cache") + + # Check for extra candles in cache (could be gap-filled forward-fill candles) + extra_in_cache = set(cached_by_time.keys()) - set(direct_by_time.keys()) + if extra_in_cache: + print(f"Note: {len(extra_in_cache)} extra candles in cache (possibly gap-filled)") + + +class TestDifferentTimeframes: + """Test with different timeframes.""" + + @pytest.mark.asyncio + @pytest.mark.parametrize("tf,interval", [ + ("1m", 60), + ("5m", 300), + ("15m", 900), + ("1h", 3600), + ]) + async def test_various_timeframes( + self, cache_manager_factory, connector, exchange_name, symbol, tf, interval + ): + """Test caching works correctly for various timeframes.""" + manager = cache_manager_factory() + manager.register_exchange(exchange_name, connector) + + now = int(time.time()) + start = now - (10 * interval) + + request = CandleRequest( + exchange=exchange_name, + symbol=symbol, + timeframe=tf, + start=start, + end=now, + ) + + result, source = await manager.get_candles_with_source(request) + + print(f"\n{tf}: {len(result)} candles from {source}") + + if len(result) > 1: + # Verify spacing matches timeframe + actual_intervals = [ + result[i].time - result[i-1].time + for i in range(1, len(result)) + ] + # Most intervals should match expected (allow some gaps) + matching = sum(1 for i in actual_intervals if i == interval) + match_rate = matching / len(actual_intervals) + print(f"Interval match rate: {match_rate:.1%}") + assert match_rate > 0.8, f"Too many interval mismatches for {tf}"