diff --git a/CLAUDE.md b/CLAUDE.md index c056f15..dfaf702 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -84,24 +84,28 @@ Three-tier caching system with REST API and WebSocket interfaces: | `main.py` | Entry point, starts REST server | | `config.py` | Configuration loading from YAML | | `api/rest.py` | FastAPI REST endpoints | +| `api/websocket.py` | WebSocket server for real-time updates | | `cache/manager.py` | Core cache logic (memory → db → exchange) | | `cache/memory.py` | In-memory LRU cache with TTL | | `cache/database.py` | Synchronous SQLite operations | | `cache/async_database.py` | Async SQLite operations for candle persistence | +| `cache/connection_pool.py` | Database connection pooling | | `cache/gaps.py` | Gap detection in candle data | | `cache/gap_filler.py` | Gap filling logic | | `cache/completeness.py` | Completeness tracking for data ranges | | `exchanges/base.py` | Abstract exchange connector interface | -| `exchanges/ccxt_connector.py` | CCXT-based connector implementation | -| `exchanges/binance.py` | Binance connector | +| `exchanges/ccxt_connector.py` | CCXT-based connector (supports 100+ exchanges) | +| `exchanges/binance.py` | Binance-specific connector (legacy, WebSocket support) | | `candles/assembler.py` | Builds candles from trade ticks | | `candles/models.py` | Candle data models and validation | | `sessions/manager.py` | Session management for connections | | `sessions/models.py` | Session data models | +| `monitoring/health.py` | Health check endpoints and system status | +| `monitoring/metrics.py` | Performance metrics tracking | | `utils/timeframes.py` | Timeframe utilities and conversions | | `utils/timestamps.py` | Timestamp utilities | -**Planned (M2):** `api/websocket.py`, `exchanges/kucoin.py`, `exchanges/kraken.py` +**Note:** KuCoin, Kraken, and 100+ other exchanges are supported via `CCXTConnector` - use `create_connector("exchange_name")` from `exchanges/__init__.py`. ### Key Paths diff --git a/config.yaml b/config.yaml index 2153b38..924ecfb 100644 --- a/config.yaml +++ b/config.yaml @@ -12,15 +12,54 @@ cache: memory_ttl_seconds: 432000 # 5 days (matches BrighterTrading) exchanges: + # Major exchanges with good public API support binance: enabled: true rate_limit_ms: 100 - # Disabled until connectors are implemented: + binanceus: + enabled: true + rate_limit_ms: 100 + bybit: + enabled: true + rate_limit_ms: 100 kucoin: - enabled: false + enabled: true rate_limit_ms: 100 kraken: - enabled: false + enabled: true + rate_limit_ms: 100 + okx: + enabled: true + rate_limit_ms: 100 + coinbase: + enabled: true + rate_limit_ms: 100 + bitget: + enabled: true + rate_limit_ms: 100 + mexc: + enabled: true + rate_limit_ms: 100 + gate: + enabled: true + rate_limit_ms: 100 + htx: + enabled: true + rate_limit_ms: 100 + bitfinex: + enabled: true + rate_limit_ms: 100 + bitstamp: + enabled: true + rate_limit_ms: 100 + gemini: + enabled: true + rate_limit_ms: 100 + poloniex: + enabled: true + rate_limit_ms: 100 + cryptocom: + enabled: true rate_limit_ms: 100 logging: diff --git a/src/exchange_data_manager/cache/manager.py b/src/exchange_data_manager/cache/manager.py index ecb0fae..f2e399a 100644 --- a/src/exchange_data_manager/cache/manager.py +++ b/src/exchange_data_manager/cache/manager.py @@ -139,9 +139,26 @@ class CacheManager: ) if memory_candles and not memory_gaps: - # Complete data in memory - logger.debug(f"Memory cache hit: {len(memory_candles)} candles") - return memory_candles, "memory" + # For limit-only requests, ensure we have enough candles + # (e.g., WebSocket may have accumulated fewer candles than requested) + if ( + request.limit is not None + and len(memory_candles) < request.limit + and request.start is None + and request.end is None + ): + # Insufficient data for limit-only request - fall through to fetch more + logger.debug( + f"Memory has {len(memory_candles)} candles but {request.limit} requested, " + "fetching more from exchange" + ) + # Clear memory_candles so we don't return partial data + # We'll fetch fresh from exchange + memory_candles = [] + else: + # Complete data in memory + logger.debug(f"Memory cache hit: {len(memory_candles)} candles") + return memory_candles, "memory" all_candles.extend(memory_candles) gaps_to_fill = memory_gaps if memory_gaps else [(request.start, request.end)] diff --git a/tests/benchmarks/test_performance.py b/tests/benchmarks/test_performance.py index 8deb225..9ead203 100644 --- a/tests/benchmarks/test_performance.py +++ b/tests/benchmarks/test_performance.py @@ -170,72 +170,88 @@ class TestDatabaseCacheBenchmarks: with tempfile.TemporaryDirectory() as tmpdir: yield os.path.join(tmpdir, "test.db") - @pytest.mark.asyncio - async def test_database_put_100_candles(self, benchmark, temp_db_path): + def test_database_put_100_candles(self, benchmark, temp_db_path): """Benchmark storing 100 candles in database.""" - db = AsyncDatabaseCache(db_path=temp_db_path) - await db.initialize() - candles = generate_candles(100) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + db = AsyncDatabaseCache(db_path=temp_db_path) + loop.run_until_complete(db.initialize()) + candles = generate_candles(100) - def put_candles(): - asyncio.get_event_loop().run_until_complete( - db.put("binance", "BTC/USDT", "5m", candles) - ) + def put_candles(): + loop.run_until_complete( + db.put("binance", "BTC/USDT", "5m", candles) + ) - benchmark(put_candles) - await db.close() + benchmark(put_candles) + loop.run_until_complete(db.close()) + finally: + loop.close() - @pytest.mark.asyncio - async def test_database_put_1000_candles(self, benchmark, temp_db_path): + def test_database_put_1000_candles(self, benchmark, temp_db_path): """Benchmark storing 1000 candles in database.""" - db = AsyncDatabaseCache(db_path=temp_db_path) - await db.initialize() - candles = generate_candles(1000) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + db = AsyncDatabaseCache(db_path=temp_db_path) + loop.run_until_complete(db.initialize()) + candles = generate_candles(1000) - def put_candles(): - asyncio.get_event_loop().run_until_complete( - db.put("binance", "BTC/USDT", "5m", candles) - ) + def put_candles(): + loop.run_until_complete( + db.put("binance", "BTC/USDT", "5m", candles) + ) - benchmark(put_candles) - await db.close() + benchmark(put_candles) + loop.run_until_complete(db.close()) + finally: + loop.close() - @pytest.mark.asyncio - async def test_database_get_1000_candles(self, benchmark, temp_db_path): + def test_database_get_1000_candles(self, benchmark, temp_db_path): """Benchmark retrieving 1000 candles from database.""" - db = AsyncDatabaseCache(db_path=temp_db_path) - await db.initialize() - candles = generate_candles(1000) - await db.put("binance", "BTC/USDT", "5m", candles) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + db = AsyncDatabaseCache(db_path=temp_db_path) + loop.run_until_complete(db.initialize()) + candles = generate_candles(1000) + loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles)) - def get_candles(): - return asyncio.get_event_loop().run_until_complete( - db.get("binance", "BTC/USDT", "5m") - ) + def get_candles(): + return loop.run_until_complete( + db.get("binance", "BTC/USDT", "5m") + ) - result = benchmark(get_candles) - assert len(result[0]) == 1000 - await db.close() + result = benchmark(get_candles) + assert len(result[0]) == 1000 + loop.run_until_complete(db.close()) + finally: + loop.close() - @pytest.mark.asyncio - async def test_database_get_with_time_range(self, benchmark, temp_db_path): + def test_database_get_with_time_range(self, benchmark, temp_db_path): """Benchmark retrieving candles with time range filter.""" - db = AsyncDatabaseCache(db_path=temp_db_path) - await db.initialize() - candles = generate_candles(10000) - await db.put("binance", "BTC/USDT", "5m", candles) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + db = AsyncDatabaseCache(db_path=temp_db_path) + loop.run_until_complete(db.initialize()) + candles = generate_candles(10000) + loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles)) - start_time = candles[2500].time - end_time = candles[7500].time + start_time = candles[2500].time + end_time = candles[7500].time - def get_candles(): - return asyncio.get_event_loop().run_until_complete( - db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time) - ) + def get_candles(): + return loop.run_until_complete( + db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time) + ) - result = benchmark(get_candles) - assert len(result[0]) > 0 - await db.close() + result = benchmark(get_candles) + assert len(result[0]) > 0 + loop.run_until_complete(db.close()) + finally: + loop.close() # ============================================================================= @@ -292,64 +308,72 @@ class TestCacheFlowBenchmarks: with tempfile.TemporaryDirectory() as tmpdir: yield os.path.join(tmpdir, "test.db") - @pytest.mark.asyncio - async def test_cache_manager_memory_hit(self, benchmark, temp_db_path): + def test_cache_manager_memory_hit(self, benchmark, temp_db_path): """Benchmark cache manager with memory cache hit.""" - manager = CacheManager( - cache_config=CacheConfig(), - database_config=DatabaseConfig(path=temp_db_path), - ) - await manager.initialize() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + manager = CacheManager( + cache_config=CacheConfig(), + database_config=DatabaseConfig(path=temp_db_path), + ) + loop.run_until_complete(manager.initialize()) - # Pre-populate memory cache - candles = generate_candles(1000) - manager.memory.put("binance:BTC/USDT:5m", candles) + # Pre-populate memory cache + candles = generate_candles(1000) + manager.memory.put("binance:BTC/USDT:5m", candles) - request = CandleRequest( - exchange="binance", - symbol="BTC/USDT", - timeframe="5m", - limit=100, - ) - - def get_candles(): - return asyncio.get_event_loop().run_until_complete( - manager.get_candles(request) + request = CandleRequest( + exchange="binance", + symbol="BTC/USDT", + timeframe="5m", + limit=100, ) - result = benchmark(get_candles) - assert len(result) == 100 + def get_candles(): + return loop.run_until_complete( + manager.get_candles(request) + ) - @pytest.mark.asyncio - async def test_cache_manager_database_hit(self, benchmark, temp_db_path): + result = benchmark(get_candles) + assert len(result) == 100 + finally: + loop.close() + + def test_cache_manager_database_hit(self, benchmark, temp_db_path): """Benchmark cache manager with database hit (cold memory cache).""" - manager = CacheManager( - cache_config=CacheConfig(), - database_config=DatabaseConfig(path=temp_db_path), - ) - await manager.initialize() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + manager = CacheManager( + cache_config=CacheConfig(), + database_config=DatabaseConfig(path=temp_db_path), + ) + loop.run_until_complete(manager.initialize()) - # Pre-populate database only - candles = generate_candles(1000) - await manager.database.put("binance", "BTC/USDT", "5m", candles) + # Pre-populate database only + candles = generate_candles(1000) + loop.run_until_complete(manager.database.put("binance", "BTC/USDT", "5m", candles)) - request = CandleRequest( - exchange="binance", - symbol="BTC/USDT", - timeframe="5m", - start=candles[0].time, - end=candles[-1].time, - ) - - def get_candles(): - # Clear memory cache to force database read - manager.clear_memory() - return asyncio.get_event_loop().run_until_complete( - manager.get_candles(request) + request = CandleRequest( + exchange="binance", + symbol="BTC/USDT", + timeframe="5m", + start=candles[0].time, + end=candles[-1].time, ) - result = benchmark(get_candles) - assert len(result) > 0 + def get_candles(): + # Clear memory cache to force database read + manager.clear_memory() + return loop.run_until_complete( + manager.get_candles(request) + ) + + result = benchmark(get_candles) + assert len(result) > 0 + finally: + loop.close() # ============================================================================= @@ -366,45 +390,53 @@ class TestConnectionPoolBenchmarks: with tempfile.TemporaryDirectory() as tmpdir: yield os.path.join(tmpdir, "test.db") - @pytest.mark.asyncio - async def test_concurrent_reads(self, benchmark, temp_db_path): + def test_concurrent_reads(self, benchmark, temp_db_path): """Benchmark concurrent database reads with connection pool.""" - db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) - await db.initialize() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) + loop.run_until_complete(db.initialize()) - # Pre-populate - candles = generate_candles(1000) - await db.put("binance", "BTC/USDT", "5m", candles) + # Pre-populate + candles = generate_candles(1000) + loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles)) - async def concurrent_reads(): - tasks = [ - db.get("binance", "BTC/USDT", "5m", limit=100) - for _ in range(10) - ] - return await asyncio.gather(*tasks) + async def concurrent_reads(): + tasks = [ + db.get("binance", "BTC/USDT", "5m", limit=100) + for _ in range(10) + ] + return await asyncio.gather(*tasks) - def run_concurrent(): - return asyncio.get_event_loop().run_until_complete(concurrent_reads()) + def run_concurrent(): + return loop.run_until_complete(concurrent_reads()) - results = benchmark(run_concurrent) - assert len(results) == 10 - await db.close() + results = benchmark(run_concurrent) + assert len(results) == 10 + loop.run_until_complete(db.close()) + finally: + loop.close() - @pytest.mark.asyncio - async def test_concurrent_writes(self, benchmark, temp_db_path): + def test_concurrent_writes(self, benchmark, temp_db_path): """Benchmark concurrent database writes with connection pool.""" - db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) - await db.initialize() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) + loop.run_until_complete(db.initialize()) - async def concurrent_writes(): - tasks = [] - for i in range(10): - candles = generate_candles(100, start_time=1700000000 + i * 1000000) - tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles)) - await asyncio.gather(*tasks) + async def concurrent_writes(): + tasks = [] + for i in range(10): + candles = generate_candles(100, start_time=1700000000 + i * 1000000) + tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles)) + await asyncio.gather(*tasks) - def run_concurrent(): - asyncio.get_event_loop().run_until_complete(concurrent_writes()) + def run_concurrent(): + loop.run_until_complete(concurrent_writes()) - benchmark(run_concurrent) - await db.close() + benchmark(run_concurrent) + loop.run_until_complete(db.close()) + finally: + loop.close() diff --git a/tests/test_cache_manager.py b/tests/test_cache_manager.py index 715fd25..c36ba07 100644 --- a/tests/test_cache_manager.py +++ b/tests/test_cache_manager.py @@ -76,7 +76,7 @@ class TestCacheManagerColdCache: @pytest.mark.asyncio @patch("exchange_data_manager.cache.memory.time.time") async def test_limit_only_caches_result(self, mock_time, cache_manager): - """Test that limit-only results are cached in memory.""" + """Test that limit-only results are cached in memory when request is satisfied.""" # Mock time to be close to test candle timestamps (prevents freshness check) mock_time.return_value = 1709337660 # Just after the candle @@ -87,13 +87,15 @@ class TestCacheManagerColdCache: mock_connector.fetch_candles.return_value = expected_candles cache_manager.register_exchange("binance", mock_connector) + # Request exactly 1 candle - matches what memory will have after first call + # (If we requested more, the cache manager would refetch since memory has fewer) request = CandleRequest( exchange="binance", symbol="BTC/USDT", timeframe="1m", start=None, end=None, - limit=100, + limit=1, ) # First call - fetches from exchange @@ -101,7 +103,7 @@ class TestCacheManagerColdCache: assert mock_connector.fetch_candles.call_count == 1 assert len(result1) == 1 - # Second call - memory cache has data, so doesn't need to fetch again + # Second call - memory cache has enough data (1 >= 1), uses cache result2 = await cache_manager.get_candles(request) assert mock_connector.fetch_candles.call_count == 1 # Still 1 - used cache assert len(result2) == 1 diff --git a/tests/test_config.py b/tests/test_config.py index 03c89d1..5af282e 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -118,11 +118,10 @@ class TestConfigLoad: enabled = config.get_enabled_exchanges() assert isinstance(enabled, list) - # Binance should be enabled per config.yaml + # Major exchanges should be enabled per config.yaml assert "binance" in enabled - # kucoin/kraken disabled in config.yaml - assert "kucoin" not in enabled - assert "kraken" not in enabled + assert "kucoin" in enabled + assert "kraken" in enabled class TestConnectorRegistry: diff --git a/tests/test_integration.py b/tests/test_integration.py index cf798d7..1aec092 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -86,7 +86,7 @@ class TestCacheManagerIntegration: @pytest.mark.asyncio @patch("exchange_data_manager.cache.memory.time.time") async def test_warm_cache_uses_memory(self, mock_time, cache_manager): - """Test that warm cache uses memory instead of exchange.""" + """Test that warm cache uses memory instead of exchange when limit is satisfied.""" # Mock time to be close to test candle timestamps (prevents freshness check) mock_time.return_value = 1709337720 # Just after the second candle @@ -97,18 +97,20 @@ class TestCacheManagerIntegration: ] cache_manager.register_exchange("binance", mock_connector) + # Request exactly 2 candles - matches what memory will have after first call + # (If we requested more, the cache manager would refetch since memory has fewer) request = CandleRequest( exchange="binance", symbol="BTC/USDT", timeframe="1m", - limit=100, + limit=2, ) # First call - cold cache await cache_manager.get_candles(request) assert mock_connector.fetch_candles.call_count == 1 - # Second call - should use memory + # Second call - should use memory (has 2 candles, requested 2) result = await cache_manager.get_candles(request) assert len(result) == 2 assert mock_connector.fetch_candles.call_count == 1 # No additional call