exchange-data-manager/tests/benchmarks/test_performance.py

443 lines
15 KiB
Python

"""
Performance benchmark tests.
Run with: pytest tests/benchmarks/ --benchmark-only
"""
import pytest
import asyncio
import tempfile
import os
from pathlib import Path
from exchange_data_manager.cache.memory import MemoryCache
from exchange_data_manager.cache.async_database import AsyncDatabaseCache
from exchange_data_manager.cache.manager import CacheManager
from exchange_data_manager.cache.gap_filler import fill_gaps
from exchange_data_manager.candles.models import Candle, CandleRequest
from exchange_data_manager.config import CacheConfig, DatabaseConfig
def generate_candles(count: int, start_time: int = 1700000000, interval: int = 300) -> list[Candle]:
"""Generate test candles."""
return [
Candle(
time=start_time + i * interval,
open=100.0 + i * 0.1,
high=101.0 + i * 0.1,
low=99.0 + i * 0.1,
close=100.5 + i * 0.1,
volume=1000.0 + i,
closed=True,
)
for i in range(count)
]
def generate_candles_with_gaps(count: int, gap_frequency: int = 10) -> list[Candle]:
"""Generate candles with periodic gaps."""
candles = []
time = 1700000000
interval = 300
for i in range(count):
# Skip every gap_frequency-th candle to create gaps
if i % gap_frequency != 0:
candles.append(Candle(
time=time,
open=100.0 + i * 0.1,
high=101.0 + i * 0.1,
low=99.0 + i * 0.1,
close=100.5 + i * 0.1,
volume=1000.0 + i,
closed=True,
))
time += interval
return candles
# =============================================================================
# Memory Cache Benchmarks
# =============================================================================
class TestMemoryCacheBenchmarks:
"""Benchmarks for memory cache operations."""
def test_memory_cache_put_100_candles(self, benchmark):
"""Benchmark storing 100 candles in memory cache."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(100)
def put_candles():
cache.put("binance:BTC/USDT:5m", candles)
benchmark(put_candles)
def test_memory_cache_put_1000_candles(self, benchmark):
"""Benchmark storing 1000 candles in memory cache."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(1000)
def put_candles():
cache.put("binance:BTC/USDT:5m", candles)
benchmark(put_candles)
def test_memory_cache_put_10000_candles(self, benchmark):
"""Benchmark storing 10000 candles in memory cache."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(10000)
def put_candles():
cache.put("binance:BTC/USDT:5m", candles)
benchmark(put_candles)
def test_memory_cache_get_full_range(self, benchmark):
"""Benchmark retrieving full range from memory cache."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(10000)
cache.put("binance:BTC/USDT:5m", candles)
def get_candles():
return cache.get("binance:BTC/USDT:5m")
result = benchmark(get_candles)
assert len(result[0]) == 10000
def test_memory_cache_get_time_range(self, benchmark):
"""Benchmark retrieving time range from memory cache."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(10000)
cache.put("binance:BTC/USDT:5m", candles)
start_time = candles[2500].time
end_time = candles[7500].time
def get_candles():
return cache.get("binance:BTC/USDT:5m", start=start_time, end=end_time)
result = benchmark(get_candles)
assert len(result[0]) > 0
def test_memory_cache_get_with_limit(self, benchmark):
"""Benchmark retrieving with limit from memory cache."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(10000)
cache.put("binance:BTC/USDT:5m", candles)
def get_candles():
return cache.get("binance:BTC/USDT:5m", limit=100)
result = benchmark(get_candles)
assert len(result[0]) == 100
def test_memory_cache_update_single_candle(self, benchmark):
"""Benchmark updating a single candle (real-time update scenario)."""
cache = MemoryCache(max_candles=100000)
candles = generate_candles(10000)
cache.put("binance:BTC/USDT:5m", candles)
update_candle = Candle(
time=candles[5000].time,
open=150.0,
high=151.0,
low=149.0,
close=150.5,
volume=5000.0,
closed=False,
)
def update():
cache.update_candle("binance:BTC/USDT:5m", update_candle)
benchmark(update)
# =============================================================================
# Database Cache Benchmarks
# =============================================================================
class TestDatabaseCacheBenchmarks:
"""Benchmarks for async database cache operations."""
@pytest.fixture
def temp_db_path(self):
"""Create a temporary database path."""
with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "test.db")
def test_database_put_100_candles(self, benchmark, temp_db_path):
"""Benchmark storing 100 candles in database."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(100)
def put_candles():
loop.run_until_complete(
db.put("binance", "BTC/USDT", "5m", candles)
)
benchmark(put_candles)
loop.run_until_complete(db.close())
finally:
loop.close()
def test_database_put_1000_candles(self, benchmark, temp_db_path):
"""Benchmark storing 1000 candles in database."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(1000)
def put_candles():
loop.run_until_complete(
db.put("binance", "BTC/USDT", "5m", candles)
)
benchmark(put_candles)
loop.run_until_complete(db.close())
finally:
loop.close()
def test_database_get_1000_candles(self, benchmark, temp_db_path):
"""Benchmark retrieving 1000 candles from database."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(1000)
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
def get_candles():
return loop.run_until_complete(
db.get("binance", "BTC/USDT", "5m")
)
result = benchmark(get_candles)
assert len(result[0]) == 1000
loop.run_until_complete(db.close())
finally:
loop.close()
def test_database_get_with_time_range(self, benchmark, temp_db_path):
"""Benchmark retrieving candles with time range filter."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(10000)
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
start_time = candles[2500].time
end_time = candles[7500].time
def get_candles():
return loop.run_until_complete(
db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time)
)
result = benchmark(get_candles)
assert len(result[0]) > 0
loop.run_until_complete(db.close())
finally:
loop.close()
# =============================================================================
# Gap Filling Benchmarks
# =============================================================================
class TestGapFillingBenchmarks:
"""Benchmarks for gap detection and filling."""
def test_gap_fill_1000_candles_no_gaps(self, benchmark):
"""Benchmark gap filling on continuous data (no gaps)."""
candles = generate_candles(1000)
def fill():
return fill_gaps(candles, "5m")
result = benchmark(fill)
assert len(result) == 1000
def test_gap_fill_1000_candles_with_gaps(self, benchmark):
"""Benchmark gap filling on data with 10% gaps."""
candles = generate_candles_with_gaps(1000, gap_frequency=10)
def fill():
return fill_gaps(candles, "5m")
result = benchmark(fill)
# Should have more candles after filling
assert len(result) >= len(candles)
def test_gap_fill_10000_candles_with_gaps(self, benchmark):
"""Benchmark gap filling on large dataset with gaps."""
candles = generate_candles_with_gaps(10000, gap_frequency=10)
def fill():
return fill_gaps(candles, "5m")
result = benchmark(fill)
assert len(result) >= len(candles)
# =============================================================================
# Full Cache Flow Benchmarks
# =============================================================================
class TestCacheFlowBenchmarks:
"""Benchmarks for full cache manager flow."""
@pytest.fixture
def temp_db_path(self):
"""Create a temporary database path."""
with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "test.db")
def test_cache_manager_memory_hit(self, benchmark, temp_db_path):
"""Benchmark cache manager with memory cache hit."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
manager = CacheManager(
cache_config=CacheConfig(),
database_config=DatabaseConfig(path=temp_db_path),
)
loop.run_until_complete(manager.initialize())
# Pre-populate memory cache
candles = generate_candles(1000)
manager.memory.put("binance:BTC/USDT:5m", candles)
request = CandleRequest(
exchange="binance",
symbol="BTC/USDT",
timeframe="5m",
limit=100,
)
def get_candles():
return loop.run_until_complete(
manager.get_candles(request)
)
result = benchmark(get_candles)
assert len(result) == 100
finally:
loop.close()
def test_cache_manager_database_hit(self, benchmark, temp_db_path):
"""Benchmark cache manager with database hit (cold memory cache)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
manager = CacheManager(
cache_config=CacheConfig(),
database_config=DatabaseConfig(path=temp_db_path),
)
loop.run_until_complete(manager.initialize())
# Pre-populate database only
candles = generate_candles(1000)
loop.run_until_complete(manager.database.put("binance", "BTC/USDT", "5m", candles))
request = CandleRequest(
exchange="binance",
symbol="BTC/USDT",
timeframe="5m",
start=candles[0].time,
end=candles[-1].time,
)
def get_candles():
# Clear memory cache to force database read
manager.clear_memory()
return loop.run_until_complete(
manager.get_candles(request)
)
result = benchmark(get_candles)
assert len(result) > 0
finally:
loop.close()
# =============================================================================
# Connection Pool Benchmarks
# =============================================================================
class TestConnectionPoolBenchmarks:
"""Benchmarks for connection pool performance."""
@pytest.fixture
def temp_db_path(self):
"""Create a temporary database path."""
with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "test.db")
def test_concurrent_reads(self, benchmark, temp_db_path):
"""Benchmark concurrent database reads with connection pool."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5)
loop.run_until_complete(db.initialize())
# Pre-populate
candles = generate_candles(1000)
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
async def concurrent_reads():
tasks = [
db.get("binance", "BTC/USDT", "5m", limit=100)
for _ in range(10)
]
return await asyncio.gather(*tasks)
def run_concurrent():
return loop.run_until_complete(concurrent_reads())
results = benchmark(run_concurrent)
assert len(results) == 10
loop.run_until_complete(db.close())
finally:
loop.close()
def test_concurrent_writes(self, benchmark, temp_db_path):
"""Benchmark concurrent database writes with connection pool."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5)
loop.run_until_complete(db.initialize())
async def concurrent_writes():
tasks = []
for i in range(10):
candles = generate_candles(100, start_time=1700000000 + i * 1000000)
tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles))
await asyncio.gather(*tasks)
def run_concurrent():
loop.run_until_complete(concurrent_writes())
benchmark(run_concurrent)
loop.run_until_complete(db.close())
finally:
loop.close()