""" Performance benchmark tests. Run with: pytest tests/benchmarks/ --benchmark-only """ import pytest import asyncio import tempfile import os from pathlib import Path from exchange_data_manager.cache.memory import MemoryCache from exchange_data_manager.cache.async_database import AsyncDatabaseCache from exchange_data_manager.cache.manager import CacheManager from exchange_data_manager.cache.gap_filler import fill_gaps from exchange_data_manager.candles.models import Candle, CandleRequest from exchange_data_manager.config import CacheConfig, DatabaseConfig def generate_candles(count: int, start_time: int = 1700000000, interval: int = 300) -> list[Candle]: """Generate test candles.""" return [ Candle( time=start_time + i * interval, open=100.0 + i * 0.1, high=101.0 + i * 0.1, low=99.0 + i * 0.1, close=100.5 + i * 0.1, volume=1000.0 + i, closed=True, ) for i in range(count) ] def generate_candles_with_gaps(count: int, gap_frequency: int = 10) -> list[Candle]: """Generate candles with periodic gaps.""" candles = [] time = 1700000000 interval = 300 for i in range(count): # Skip every gap_frequency-th candle to create gaps if i % gap_frequency != 0: candles.append(Candle( time=time, open=100.0 + i * 0.1, high=101.0 + i * 0.1, low=99.0 + i * 0.1, close=100.5 + i * 0.1, volume=1000.0 + i, closed=True, )) time += interval return candles # ============================================================================= # Memory Cache Benchmarks # ============================================================================= class TestMemoryCacheBenchmarks: """Benchmarks for memory cache operations.""" def test_memory_cache_put_100_candles(self, benchmark): """Benchmark storing 100 candles in memory cache.""" cache = MemoryCache(max_candles=100000) candles = generate_candles(100) def put_candles(): cache.put("binance:BTC/USDT:5m", candles) benchmark(put_candles) def test_memory_cache_put_1000_candles(self, benchmark): """Benchmark storing 1000 candles in memory cache.""" cache = MemoryCache(max_candles=100000) candles = generate_candles(1000) def put_candles(): cache.put("binance:BTC/USDT:5m", candles) benchmark(put_candles) def test_memory_cache_put_10000_candles(self, benchmark): """Benchmark storing 10000 candles in memory cache.""" cache = MemoryCache(max_candles=100000) candles = generate_candles(10000) def put_candles(): cache.put("binance:BTC/USDT:5m", candles) benchmark(put_candles) def test_memory_cache_get_full_range(self, benchmark): """Benchmark retrieving full range from memory cache.""" cache = MemoryCache(max_candles=100000) candles = generate_candles(10000) cache.put("binance:BTC/USDT:5m", candles) def get_candles(): return cache.get("binance:BTC/USDT:5m") result = benchmark(get_candles) assert len(result[0]) == 10000 def test_memory_cache_get_time_range(self, benchmark): """Benchmark retrieving time range from memory cache.""" cache = MemoryCache(max_candles=100000) candles = generate_candles(10000) cache.put("binance:BTC/USDT:5m", candles) start_time = candles[2500].time end_time = candles[7500].time def get_candles(): return cache.get("binance:BTC/USDT:5m", start=start_time, end=end_time) result = benchmark(get_candles) assert len(result[0]) > 0 def test_memory_cache_get_with_limit(self, benchmark): """Benchmark retrieving with limit from memory cache.""" cache = MemoryCache(max_candles=100000) candles = generate_candles(10000) cache.put("binance:BTC/USDT:5m", candles) def get_candles(): return cache.get("binance:BTC/USDT:5m", limit=100) result = benchmark(get_candles) assert len(result[0]) == 100 def test_memory_cache_update_single_candle(self, benchmark): """Benchmark updating a single candle (real-time update scenario).""" cache = MemoryCache(max_candles=100000) candles = generate_candles(10000) cache.put("binance:BTC/USDT:5m", candles) update_candle = Candle( time=candles[5000].time, open=150.0, high=151.0, low=149.0, close=150.5, volume=5000.0, closed=False, ) def update(): cache.update_candle("binance:BTC/USDT:5m", update_candle) benchmark(update) # ============================================================================= # Database Cache Benchmarks # ============================================================================= class TestDatabaseCacheBenchmarks: """Benchmarks for async database cache operations.""" @pytest.fixture def temp_db_path(self): """Create a temporary database path.""" with tempfile.TemporaryDirectory() as tmpdir: yield os.path.join(tmpdir, "test.db") def test_database_put_100_candles(self, benchmark, temp_db_path): """Benchmark storing 100 candles in database.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: db = AsyncDatabaseCache(db_path=temp_db_path) loop.run_until_complete(db.initialize()) candles = generate_candles(100) def put_candles(): loop.run_until_complete( db.put("binance", "BTC/USDT", "5m", candles) ) benchmark(put_candles) loop.run_until_complete(db.close()) finally: loop.close() def test_database_put_1000_candles(self, benchmark, temp_db_path): """Benchmark storing 1000 candles in database.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: db = AsyncDatabaseCache(db_path=temp_db_path) loop.run_until_complete(db.initialize()) candles = generate_candles(1000) def put_candles(): loop.run_until_complete( db.put("binance", "BTC/USDT", "5m", candles) ) benchmark(put_candles) loop.run_until_complete(db.close()) finally: loop.close() def test_database_get_1000_candles(self, benchmark, temp_db_path): """Benchmark retrieving 1000 candles from database.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: db = AsyncDatabaseCache(db_path=temp_db_path) loop.run_until_complete(db.initialize()) candles = generate_candles(1000) loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles)) def get_candles(): return loop.run_until_complete( db.get("binance", "BTC/USDT", "5m") ) result = benchmark(get_candles) assert len(result[0]) == 1000 loop.run_until_complete(db.close()) finally: loop.close() def test_database_get_with_time_range(self, benchmark, temp_db_path): """Benchmark retrieving candles with time range filter.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: db = AsyncDatabaseCache(db_path=temp_db_path) loop.run_until_complete(db.initialize()) candles = generate_candles(10000) loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles)) start_time = candles[2500].time end_time = candles[7500].time def get_candles(): return loop.run_until_complete( db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time) ) result = benchmark(get_candles) assert len(result[0]) > 0 loop.run_until_complete(db.close()) finally: loop.close() # ============================================================================= # Gap Filling Benchmarks # ============================================================================= class TestGapFillingBenchmarks: """Benchmarks for gap detection and filling.""" def test_gap_fill_1000_candles_no_gaps(self, benchmark): """Benchmark gap filling on continuous data (no gaps).""" candles = generate_candles(1000) def fill(): return fill_gaps(candles, "5m") result = benchmark(fill) assert len(result) == 1000 def test_gap_fill_1000_candles_with_gaps(self, benchmark): """Benchmark gap filling on data with 10% gaps.""" candles = generate_candles_with_gaps(1000, gap_frequency=10) def fill(): return fill_gaps(candles, "5m") result = benchmark(fill) # Should have more candles after filling assert len(result) >= len(candles) def test_gap_fill_10000_candles_with_gaps(self, benchmark): """Benchmark gap filling on large dataset with gaps.""" candles = generate_candles_with_gaps(10000, gap_frequency=10) def fill(): return fill_gaps(candles, "5m") result = benchmark(fill) assert len(result) >= len(candles) # ============================================================================= # Full Cache Flow Benchmarks # ============================================================================= class TestCacheFlowBenchmarks: """Benchmarks for full cache manager flow.""" @pytest.fixture def temp_db_path(self): """Create a temporary database path.""" with tempfile.TemporaryDirectory() as tmpdir: yield os.path.join(tmpdir, "test.db") def test_cache_manager_memory_hit(self, benchmark, temp_db_path): """Benchmark cache manager with memory cache hit.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: manager = CacheManager( cache_config=CacheConfig(), database_config=DatabaseConfig(path=temp_db_path), ) loop.run_until_complete(manager.initialize()) # Pre-populate memory cache candles = generate_candles(1000) manager.memory.put("binance:BTC/USDT:5m", candles) request = CandleRequest( exchange="binance", symbol="BTC/USDT", timeframe="5m", limit=100, ) def get_candles(): return loop.run_until_complete( manager.get_candles(request) ) result = benchmark(get_candles) assert len(result) == 100 finally: loop.close() def test_cache_manager_database_hit(self, benchmark, temp_db_path): """Benchmark cache manager with database hit (cold memory cache).""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: manager = CacheManager( cache_config=CacheConfig(), database_config=DatabaseConfig(path=temp_db_path), ) loop.run_until_complete(manager.initialize()) # Pre-populate database only candles = generate_candles(1000) loop.run_until_complete(manager.database.put("binance", "BTC/USDT", "5m", candles)) request = CandleRequest( exchange="binance", symbol="BTC/USDT", timeframe="5m", start=candles[0].time, end=candles[-1].time, ) def get_candles(): # Clear memory cache to force database read manager.clear_memory() return loop.run_until_complete( manager.get_candles(request) ) result = benchmark(get_candles) assert len(result) > 0 finally: loop.close() # ============================================================================= # Connection Pool Benchmarks # ============================================================================= class TestConnectionPoolBenchmarks: """Benchmarks for connection pool performance.""" @pytest.fixture def temp_db_path(self): """Create a temporary database path.""" with tempfile.TemporaryDirectory() as tmpdir: yield os.path.join(tmpdir, "test.db") def test_concurrent_reads(self, benchmark, temp_db_path): """Benchmark concurrent database reads with connection pool.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) loop.run_until_complete(db.initialize()) # Pre-populate candles = generate_candles(1000) loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles)) async def concurrent_reads(): tasks = [ db.get("binance", "BTC/USDT", "5m", limit=100) for _ in range(10) ] return await asyncio.gather(*tasks) def run_concurrent(): return loop.run_until_complete(concurrent_reads()) results = benchmark(run_concurrent) assert len(results) == 10 loop.run_until_complete(db.close()) finally: loop.close() def test_concurrent_writes(self, benchmark, temp_db_path): """Benchmark concurrent database writes with connection pool.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) loop.run_until_complete(db.initialize()) async def concurrent_writes(): tasks = [] for i in range(10): candles = generate_candles(100, start_time=1700000000 + i * 1000000) tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles)) await asyncio.gather(*tasks) def run_concurrent(): loop.run_until_complete(concurrent_writes()) benchmark(run_concurrent) loop.run_until_complete(db.close()) finally: loop.close()