443 lines
15 KiB
Python
443 lines
15 KiB
Python
"""
|
|
Performance benchmark tests.
|
|
|
|
Run with: pytest tests/benchmarks/ --benchmark-only
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import tempfile
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from exchange_data_manager.cache.memory import MemoryCache
|
|
from exchange_data_manager.cache.async_database import AsyncDatabaseCache
|
|
from exchange_data_manager.cache.manager import CacheManager
|
|
from exchange_data_manager.cache.gap_filler import fill_gaps
|
|
from exchange_data_manager.candles.models import Candle, CandleRequest
|
|
from exchange_data_manager.config import CacheConfig, DatabaseConfig
|
|
|
|
|
|
def generate_candles(count: int, start_time: int = 1700000000, interval: int = 300) -> list[Candle]:
|
|
"""Generate test candles."""
|
|
return [
|
|
Candle(
|
|
time=start_time + i * interval,
|
|
open=100.0 + i * 0.1,
|
|
high=101.0 + i * 0.1,
|
|
low=99.0 + i * 0.1,
|
|
close=100.5 + i * 0.1,
|
|
volume=1000.0 + i,
|
|
closed=True,
|
|
)
|
|
for i in range(count)
|
|
]
|
|
|
|
|
|
def generate_candles_with_gaps(count: int, gap_frequency: int = 10) -> list[Candle]:
|
|
"""Generate candles with periodic gaps."""
|
|
candles = []
|
|
time = 1700000000
|
|
interval = 300
|
|
|
|
for i in range(count):
|
|
# Skip every gap_frequency-th candle to create gaps
|
|
if i % gap_frequency != 0:
|
|
candles.append(Candle(
|
|
time=time,
|
|
open=100.0 + i * 0.1,
|
|
high=101.0 + i * 0.1,
|
|
low=99.0 + i * 0.1,
|
|
close=100.5 + i * 0.1,
|
|
volume=1000.0 + i,
|
|
closed=True,
|
|
))
|
|
time += interval
|
|
|
|
return candles
|
|
|
|
|
|
# =============================================================================
|
|
# Memory Cache Benchmarks
|
|
# =============================================================================
|
|
|
|
|
|
class TestMemoryCacheBenchmarks:
|
|
"""Benchmarks for memory cache operations."""
|
|
|
|
def test_memory_cache_put_100_candles(self, benchmark):
|
|
"""Benchmark storing 100 candles in memory cache."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(100)
|
|
|
|
def put_candles():
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
benchmark(put_candles)
|
|
|
|
def test_memory_cache_put_1000_candles(self, benchmark):
|
|
"""Benchmark storing 1000 candles in memory cache."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(1000)
|
|
|
|
def put_candles():
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
benchmark(put_candles)
|
|
|
|
def test_memory_cache_put_10000_candles(self, benchmark):
|
|
"""Benchmark storing 10000 candles in memory cache."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(10000)
|
|
|
|
def put_candles():
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
benchmark(put_candles)
|
|
|
|
def test_memory_cache_get_full_range(self, benchmark):
|
|
"""Benchmark retrieving full range from memory cache."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(10000)
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
def get_candles():
|
|
return cache.get("binance:BTC/USDT:5m")
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result[0]) == 10000
|
|
|
|
def test_memory_cache_get_time_range(self, benchmark):
|
|
"""Benchmark retrieving time range from memory cache."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(10000)
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
start_time = candles[2500].time
|
|
end_time = candles[7500].time
|
|
|
|
def get_candles():
|
|
return cache.get("binance:BTC/USDT:5m", start=start_time, end=end_time)
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result[0]) > 0
|
|
|
|
def test_memory_cache_get_with_limit(self, benchmark):
|
|
"""Benchmark retrieving with limit from memory cache."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(10000)
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
def get_candles():
|
|
return cache.get("binance:BTC/USDT:5m", limit=100)
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result[0]) == 100
|
|
|
|
def test_memory_cache_update_single_candle(self, benchmark):
|
|
"""Benchmark updating a single candle (real-time update scenario)."""
|
|
cache = MemoryCache(max_candles=100000)
|
|
candles = generate_candles(10000)
|
|
cache.put("binance:BTC/USDT:5m", candles)
|
|
|
|
update_candle = Candle(
|
|
time=candles[5000].time,
|
|
open=150.0,
|
|
high=151.0,
|
|
low=149.0,
|
|
close=150.5,
|
|
volume=5000.0,
|
|
closed=False,
|
|
)
|
|
|
|
def update():
|
|
cache.update_candle("binance:BTC/USDT:5m", update_candle)
|
|
|
|
benchmark(update)
|
|
|
|
|
|
# =============================================================================
|
|
# Database Cache Benchmarks
|
|
# =============================================================================
|
|
|
|
|
|
class TestDatabaseCacheBenchmarks:
|
|
"""Benchmarks for async database cache operations."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_path(self):
|
|
"""Create a temporary database path."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
yield os.path.join(tmpdir, "test.db")
|
|
|
|
def test_database_put_100_candles(self, benchmark, temp_db_path):
|
|
"""Benchmark storing 100 candles in database."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
db = AsyncDatabaseCache(db_path=temp_db_path)
|
|
loop.run_until_complete(db.initialize())
|
|
candles = generate_candles(100)
|
|
|
|
def put_candles():
|
|
loop.run_until_complete(
|
|
db.put("binance", "BTC/USDT", "5m", candles)
|
|
)
|
|
|
|
benchmark(put_candles)
|
|
loop.run_until_complete(db.close())
|
|
finally:
|
|
loop.close()
|
|
|
|
def test_database_put_1000_candles(self, benchmark, temp_db_path):
|
|
"""Benchmark storing 1000 candles in database."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
db = AsyncDatabaseCache(db_path=temp_db_path)
|
|
loop.run_until_complete(db.initialize())
|
|
candles = generate_candles(1000)
|
|
|
|
def put_candles():
|
|
loop.run_until_complete(
|
|
db.put("binance", "BTC/USDT", "5m", candles)
|
|
)
|
|
|
|
benchmark(put_candles)
|
|
loop.run_until_complete(db.close())
|
|
finally:
|
|
loop.close()
|
|
|
|
def test_database_get_1000_candles(self, benchmark, temp_db_path):
|
|
"""Benchmark retrieving 1000 candles from database."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
db = AsyncDatabaseCache(db_path=temp_db_path)
|
|
loop.run_until_complete(db.initialize())
|
|
candles = generate_candles(1000)
|
|
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
|
|
|
|
def get_candles():
|
|
return loop.run_until_complete(
|
|
db.get("binance", "BTC/USDT", "5m")
|
|
)
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result[0]) == 1000
|
|
loop.run_until_complete(db.close())
|
|
finally:
|
|
loop.close()
|
|
|
|
def test_database_get_with_time_range(self, benchmark, temp_db_path):
|
|
"""Benchmark retrieving candles with time range filter."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
db = AsyncDatabaseCache(db_path=temp_db_path)
|
|
loop.run_until_complete(db.initialize())
|
|
candles = generate_candles(10000)
|
|
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
|
|
|
|
start_time = candles[2500].time
|
|
end_time = candles[7500].time
|
|
|
|
def get_candles():
|
|
return loop.run_until_complete(
|
|
db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time)
|
|
)
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result[0]) > 0
|
|
loop.run_until_complete(db.close())
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
# =============================================================================
|
|
# Gap Filling Benchmarks
|
|
# =============================================================================
|
|
|
|
|
|
class TestGapFillingBenchmarks:
|
|
"""Benchmarks for gap detection and filling."""
|
|
|
|
def test_gap_fill_1000_candles_no_gaps(self, benchmark):
|
|
"""Benchmark gap filling on continuous data (no gaps)."""
|
|
candles = generate_candles(1000)
|
|
|
|
def fill():
|
|
return fill_gaps(candles, "5m")
|
|
|
|
result = benchmark(fill)
|
|
assert len(result) == 1000
|
|
|
|
def test_gap_fill_1000_candles_with_gaps(self, benchmark):
|
|
"""Benchmark gap filling on data with 10% gaps."""
|
|
candles = generate_candles_with_gaps(1000, gap_frequency=10)
|
|
|
|
def fill():
|
|
return fill_gaps(candles, "5m")
|
|
|
|
result = benchmark(fill)
|
|
# Should have more candles after filling
|
|
assert len(result) >= len(candles)
|
|
|
|
def test_gap_fill_10000_candles_with_gaps(self, benchmark):
|
|
"""Benchmark gap filling on large dataset with gaps."""
|
|
candles = generate_candles_with_gaps(10000, gap_frequency=10)
|
|
|
|
def fill():
|
|
return fill_gaps(candles, "5m")
|
|
|
|
result = benchmark(fill)
|
|
assert len(result) >= len(candles)
|
|
|
|
|
|
# =============================================================================
|
|
# Full Cache Flow Benchmarks
|
|
# =============================================================================
|
|
|
|
|
|
class TestCacheFlowBenchmarks:
|
|
"""Benchmarks for full cache manager flow."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_path(self):
|
|
"""Create a temporary database path."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
yield os.path.join(tmpdir, "test.db")
|
|
|
|
def test_cache_manager_memory_hit(self, benchmark, temp_db_path):
|
|
"""Benchmark cache manager with memory cache hit."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
manager = CacheManager(
|
|
cache_config=CacheConfig(),
|
|
database_config=DatabaseConfig(path=temp_db_path),
|
|
)
|
|
loop.run_until_complete(manager.initialize())
|
|
|
|
# Pre-populate memory cache
|
|
candles = generate_candles(1000)
|
|
manager.memory.put("binance:BTC/USDT:5m", candles)
|
|
|
|
request = CandleRequest(
|
|
exchange="binance",
|
|
symbol="BTC/USDT",
|
|
timeframe="5m",
|
|
limit=100,
|
|
)
|
|
|
|
def get_candles():
|
|
return loop.run_until_complete(
|
|
manager.get_candles(request)
|
|
)
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result) == 100
|
|
finally:
|
|
loop.close()
|
|
|
|
def test_cache_manager_database_hit(self, benchmark, temp_db_path):
|
|
"""Benchmark cache manager with database hit (cold memory cache)."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
manager = CacheManager(
|
|
cache_config=CacheConfig(),
|
|
database_config=DatabaseConfig(path=temp_db_path),
|
|
)
|
|
loop.run_until_complete(manager.initialize())
|
|
|
|
# Pre-populate database only
|
|
candles = generate_candles(1000)
|
|
loop.run_until_complete(manager.database.put("binance", "BTC/USDT", "5m", candles))
|
|
|
|
request = CandleRequest(
|
|
exchange="binance",
|
|
symbol="BTC/USDT",
|
|
timeframe="5m",
|
|
start=candles[0].time,
|
|
end=candles[-1].time,
|
|
)
|
|
|
|
def get_candles():
|
|
# Clear memory cache to force database read
|
|
manager.clear_memory()
|
|
return loop.run_until_complete(
|
|
manager.get_candles(request)
|
|
)
|
|
|
|
result = benchmark(get_candles)
|
|
assert len(result) > 0
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
# =============================================================================
|
|
# Connection Pool Benchmarks
|
|
# =============================================================================
|
|
|
|
|
|
class TestConnectionPoolBenchmarks:
|
|
"""Benchmarks for connection pool performance."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_path(self):
|
|
"""Create a temporary database path."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
yield os.path.join(tmpdir, "test.db")
|
|
|
|
def test_concurrent_reads(self, benchmark, temp_db_path):
|
|
"""Benchmark concurrent database reads with connection pool."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5)
|
|
loop.run_until_complete(db.initialize())
|
|
|
|
# Pre-populate
|
|
candles = generate_candles(1000)
|
|
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
|
|
|
|
async def concurrent_reads():
|
|
tasks = [
|
|
db.get("binance", "BTC/USDT", "5m", limit=100)
|
|
for _ in range(10)
|
|
]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
def run_concurrent():
|
|
return loop.run_until_complete(concurrent_reads())
|
|
|
|
results = benchmark(run_concurrent)
|
|
assert len(results) == 10
|
|
loop.run_until_complete(db.close())
|
|
finally:
|
|
loop.close()
|
|
|
|
def test_concurrent_writes(self, benchmark, temp_db_path):
|
|
"""Benchmark concurrent database writes with connection pool."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5)
|
|
loop.run_until_complete(db.initialize())
|
|
|
|
async def concurrent_writes():
|
|
tasks = []
|
|
for i in range(10):
|
|
candles = generate_candles(100, start_time=1700000000 + i * 1000000)
|
|
tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles))
|
|
await asyncio.gather(*tasks)
|
|
|
|
def run_concurrent():
|
|
loop.run_until_complete(concurrent_writes())
|
|
|
|
benchmark(run_concurrent)
|
|
loop.run_until_complete(db.close())
|
|
finally:
|
|
loop.close()
|