Compare commits

..

2 Commits

Author SHA1 Message Date
rob c4bcc19241 Fix SQLite database locking under concurrent access
Add busy_timeout (5 seconds) to both async and sync database connections.
This prevents "database is locked" errors when multiple requests try to
access the database simultaneously.

Changes:
- AsyncSQLitePool: Add busy_timeout=5000 and synchronous=NORMAL pragmas
- DatabaseCache: Add WAL mode, busy_timeout, synchronous=NORMAL, and
  connection timeout for the sync database module

The busy_timeout tells SQLite to wait instead of immediately failing
when the database is locked by another connection.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-06 13:40:58 -04:00
rob bf95490673 Update docs, enable exchanges, and fix tests
- Update CLAUDE.md to reflect actual module structure (add websocket,
  connection_pool, monitoring modules; remove outdated "Planned" note)
- Enable additional exchanges in config.yaml (kucoin, kraken, okx, etc.)
- Fix cache manager to refetch when memory has fewer candles than limit
- Fix benchmark tests asyncio event loop issues (use new_event_loop)
- Update unit tests to match new cache behavior and config

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-06 13:36:38 -04:00
9 changed files with 255 additions and 149 deletions

View File

@ -84,24 +84,28 @@ Three-tier caching system with REST API and WebSocket interfaces:
| `main.py` | Entry point, starts REST server | | `main.py` | Entry point, starts REST server |
| `config.py` | Configuration loading from YAML | | `config.py` | Configuration loading from YAML |
| `api/rest.py` | FastAPI REST endpoints | | `api/rest.py` | FastAPI REST endpoints |
| `api/websocket.py` | WebSocket server for real-time updates |
| `cache/manager.py` | Core cache logic (memory → db → exchange) | | `cache/manager.py` | Core cache logic (memory → db → exchange) |
| `cache/memory.py` | In-memory LRU cache with TTL | | `cache/memory.py` | In-memory LRU cache with TTL |
| `cache/database.py` | Synchronous SQLite operations | | `cache/database.py` | Synchronous SQLite operations |
| `cache/async_database.py` | Async SQLite operations for candle persistence | | `cache/async_database.py` | Async SQLite operations for candle persistence |
| `cache/connection_pool.py` | Database connection pooling |
| `cache/gaps.py` | Gap detection in candle data | | `cache/gaps.py` | Gap detection in candle data |
| `cache/gap_filler.py` | Gap filling logic | | `cache/gap_filler.py` | Gap filling logic |
| `cache/completeness.py` | Completeness tracking for data ranges | | `cache/completeness.py` | Completeness tracking for data ranges |
| `exchanges/base.py` | Abstract exchange connector interface | | `exchanges/base.py` | Abstract exchange connector interface |
| `exchanges/ccxt_connector.py` | CCXT-based connector implementation | | `exchanges/ccxt_connector.py` | CCXT-based connector (supports 100+ exchanges) |
| `exchanges/binance.py` | Binance connector | | `exchanges/binance.py` | Binance-specific connector (legacy, WebSocket support) |
| `candles/assembler.py` | Builds candles from trade ticks | | `candles/assembler.py` | Builds candles from trade ticks |
| `candles/models.py` | Candle data models and validation | | `candles/models.py` | Candle data models and validation |
| `sessions/manager.py` | Session management for connections | | `sessions/manager.py` | Session management for connections |
| `sessions/models.py` | Session data models | | `sessions/models.py` | Session data models |
| `monitoring/health.py` | Health check endpoints and system status |
| `monitoring/metrics.py` | Performance metrics tracking |
| `utils/timeframes.py` | Timeframe utilities and conversions | | `utils/timeframes.py` | Timeframe utilities and conversions |
| `utils/timestamps.py` | Timestamp utilities | | `utils/timestamps.py` | Timestamp utilities |
**Planned (M2):** `api/websocket.py`, `exchanges/kucoin.py`, `exchanges/kraken.py` **Note:** KuCoin, Kraken, and 100+ other exchanges are supported via `CCXTConnector` - use `create_connector("exchange_name")` from `exchanges/__init__.py`.
### Key Paths ### Key Paths

View File

@ -12,15 +12,54 @@ cache:
memory_ttl_seconds: 432000 # 5 days (matches BrighterTrading) memory_ttl_seconds: 432000 # 5 days (matches BrighterTrading)
exchanges: exchanges:
# Major exchanges with good public API support
binance: binance:
enabled: true enabled: true
rate_limit_ms: 100 rate_limit_ms: 100
# Disabled until connectors are implemented: binanceus:
enabled: true
rate_limit_ms: 100
bybit:
enabled: true
rate_limit_ms: 100
kucoin: kucoin:
enabled: false enabled: true
rate_limit_ms: 100 rate_limit_ms: 100
kraken: kraken:
enabled: false enabled: true
rate_limit_ms: 100
okx:
enabled: true
rate_limit_ms: 100
coinbase:
enabled: true
rate_limit_ms: 100
bitget:
enabled: true
rate_limit_ms: 100
mexc:
enabled: true
rate_limit_ms: 100
gate:
enabled: true
rate_limit_ms: 100
htx:
enabled: true
rate_limit_ms: 100
bitfinex:
enabled: true
rate_limit_ms: 100
bitstamp:
enabled: true
rate_limit_ms: 100
gemini:
enabled: true
rate_limit_ms: 100
poloniex:
enabled: true
rate_limit_ms: 100
cryptocom:
enabled: true
rate_limit_ms: 100 rate_limit_ms: 100
logging: logging:

View File

@ -76,6 +76,11 @@ class AsyncSQLitePool:
conn = await aiosqlite.connect(self.db_path) conn = await aiosqlite.connect(self.db_path)
# Enable WAL mode for better concurrent read/write performance # Enable WAL mode for better concurrent read/write performance
await conn.execute("PRAGMA journal_mode=WAL") await conn.execute("PRAGMA journal_mode=WAL")
# Set busy timeout to wait up to 5 seconds when database is locked
# This prevents "database is locked" errors during concurrent access
await conn.execute("PRAGMA busy_timeout=5000")
# Use NORMAL synchronous mode with WAL for good performance + durability
await conn.execute("PRAGMA synchronous=NORMAL")
# Enable foreign keys # Enable foreign keys
await conn.execute("PRAGMA foreign_keys=ON") await conn.execute("PRAGMA foreign_keys=ON")
return conn return conn

View File

@ -73,8 +73,14 @@ class DatabaseCache:
@contextmanager @contextmanager
def _get_connection(self): def _get_connection(self):
"""Get a database connection with proper cleanup.""" """Get a database connection with proper cleanup."""
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path, timeout=5.0)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
# Enable WAL mode for better concurrent read/write performance
conn.execute("PRAGMA journal_mode=WAL")
# Set busy timeout to wait when database is locked (in addition to connect timeout)
conn.execute("PRAGMA busy_timeout=5000")
# Use NORMAL synchronous mode with WAL for good performance + durability
conn.execute("PRAGMA synchronous=NORMAL")
try: try:
yield conn yield conn
finally: finally:

View File

@ -139,9 +139,26 @@ class CacheManager:
) )
if memory_candles and not memory_gaps: if memory_candles and not memory_gaps:
# Complete data in memory # For limit-only requests, ensure we have enough candles
logger.debug(f"Memory cache hit: {len(memory_candles)} candles") # (e.g., WebSocket may have accumulated fewer candles than requested)
return memory_candles, "memory" if (
request.limit is not None
and len(memory_candles) < request.limit
and request.start is None
and request.end is None
):
# Insufficient data for limit-only request - fall through to fetch more
logger.debug(
f"Memory has {len(memory_candles)} candles but {request.limit} requested, "
"fetching more from exchange"
)
# Clear memory_candles so we don't return partial data
# We'll fetch fresh from exchange
memory_candles = []
else:
# Complete data in memory
logger.debug(f"Memory cache hit: {len(memory_candles)} candles")
return memory_candles, "memory"
all_candles.extend(memory_candles) all_candles.extend(memory_candles)
gaps_to_fill = memory_gaps if memory_gaps else [(request.start, request.end)] gaps_to_fill = memory_gaps if memory_gaps else [(request.start, request.end)]

View File

@ -170,72 +170,88 @@ class TestDatabaseCacheBenchmarks:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "test.db") yield os.path.join(tmpdir, "test.db")
@pytest.mark.asyncio def test_database_put_100_candles(self, benchmark, temp_db_path):
async def test_database_put_100_candles(self, benchmark, temp_db_path):
"""Benchmark storing 100 candles in database.""" """Benchmark storing 100 candles in database."""
db = AsyncDatabaseCache(db_path=temp_db_path) loop = asyncio.new_event_loop()
await db.initialize() asyncio.set_event_loop(loop)
candles = generate_candles(100) try:
db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(100)
def put_candles(): def put_candles():
asyncio.get_event_loop().run_until_complete( loop.run_until_complete(
db.put("binance", "BTC/USDT", "5m", candles) db.put("binance", "BTC/USDT", "5m", candles)
) )
benchmark(put_candles) benchmark(put_candles)
await db.close() loop.run_until_complete(db.close())
finally:
loop.close()
@pytest.mark.asyncio def test_database_put_1000_candles(self, benchmark, temp_db_path):
async def test_database_put_1000_candles(self, benchmark, temp_db_path):
"""Benchmark storing 1000 candles in database.""" """Benchmark storing 1000 candles in database."""
db = AsyncDatabaseCache(db_path=temp_db_path) loop = asyncio.new_event_loop()
await db.initialize() asyncio.set_event_loop(loop)
candles = generate_candles(1000) try:
db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(1000)
def put_candles(): def put_candles():
asyncio.get_event_loop().run_until_complete( loop.run_until_complete(
db.put("binance", "BTC/USDT", "5m", candles) db.put("binance", "BTC/USDT", "5m", candles)
) )
benchmark(put_candles) benchmark(put_candles)
await db.close() loop.run_until_complete(db.close())
finally:
loop.close()
@pytest.mark.asyncio def test_database_get_1000_candles(self, benchmark, temp_db_path):
async def test_database_get_1000_candles(self, benchmark, temp_db_path):
"""Benchmark retrieving 1000 candles from database.""" """Benchmark retrieving 1000 candles from database."""
db = AsyncDatabaseCache(db_path=temp_db_path) loop = asyncio.new_event_loop()
await db.initialize() asyncio.set_event_loop(loop)
candles = generate_candles(1000) try:
await db.put("binance", "BTC/USDT", "5m", candles) db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(1000)
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
def get_candles(): def get_candles():
return asyncio.get_event_loop().run_until_complete( return loop.run_until_complete(
db.get("binance", "BTC/USDT", "5m") db.get("binance", "BTC/USDT", "5m")
) )
result = benchmark(get_candles) result = benchmark(get_candles)
assert len(result[0]) == 1000 assert len(result[0]) == 1000
await db.close() loop.run_until_complete(db.close())
finally:
loop.close()
@pytest.mark.asyncio def test_database_get_with_time_range(self, benchmark, temp_db_path):
async def test_database_get_with_time_range(self, benchmark, temp_db_path):
"""Benchmark retrieving candles with time range filter.""" """Benchmark retrieving candles with time range filter."""
db = AsyncDatabaseCache(db_path=temp_db_path) loop = asyncio.new_event_loop()
await db.initialize() asyncio.set_event_loop(loop)
candles = generate_candles(10000) try:
await db.put("binance", "BTC/USDT", "5m", candles) db = AsyncDatabaseCache(db_path=temp_db_path)
loop.run_until_complete(db.initialize())
candles = generate_candles(10000)
loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
start_time = candles[2500].time start_time = candles[2500].time
end_time = candles[7500].time end_time = candles[7500].time
def get_candles(): def get_candles():
return asyncio.get_event_loop().run_until_complete( return loop.run_until_complete(
db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time) db.get("binance", "BTC/USDT", "5m", start=start_time, end=end_time)
) )
result = benchmark(get_candles) result = benchmark(get_candles)
assert len(result[0]) > 0 assert len(result[0]) > 0
await db.close() loop.run_until_complete(db.close())
finally:
loop.close()
# ============================================================================= # =============================================================================
@ -292,64 +308,72 @@ class TestCacheFlowBenchmarks:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "test.db") yield os.path.join(tmpdir, "test.db")
@pytest.mark.asyncio def test_cache_manager_memory_hit(self, benchmark, temp_db_path):
async def test_cache_manager_memory_hit(self, benchmark, temp_db_path):
"""Benchmark cache manager with memory cache hit.""" """Benchmark cache manager with memory cache hit."""
manager = CacheManager( loop = asyncio.new_event_loop()
cache_config=CacheConfig(), asyncio.set_event_loop(loop)
database_config=DatabaseConfig(path=temp_db_path), try:
) manager = CacheManager(
await manager.initialize() cache_config=CacheConfig(),
database_config=DatabaseConfig(path=temp_db_path),
)
loop.run_until_complete(manager.initialize())
# Pre-populate memory cache # Pre-populate memory cache
candles = generate_candles(1000) candles = generate_candles(1000)
manager.memory.put("binance:BTC/USDT:5m", candles) manager.memory.put("binance:BTC/USDT:5m", candles)
request = CandleRequest( request = CandleRequest(
exchange="binance", exchange="binance",
symbol="BTC/USDT", symbol="BTC/USDT",
timeframe="5m", timeframe="5m",
limit=100, limit=100,
)
def get_candles():
return asyncio.get_event_loop().run_until_complete(
manager.get_candles(request)
) )
result = benchmark(get_candles) def get_candles():
assert len(result) == 100 return loop.run_until_complete(
manager.get_candles(request)
)
@pytest.mark.asyncio result = benchmark(get_candles)
async def test_cache_manager_database_hit(self, benchmark, temp_db_path): assert len(result) == 100
finally:
loop.close()
def test_cache_manager_database_hit(self, benchmark, temp_db_path):
"""Benchmark cache manager with database hit (cold memory cache).""" """Benchmark cache manager with database hit (cold memory cache)."""
manager = CacheManager( loop = asyncio.new_event_loop()
cache_config=CacheConfig(), asyncio.set_event_loop(loop)
database_config=DatabaseConfig(path=temp_db_path), try:
) manager = CacheManager(
await manager.initialize() cache_config=CacheConfig(),
database_config=DatabaseConfig(path=temp_db_path),
)
loop.run_until_complete(manager.initialize())
# Pre-populate database only # Pre-populate database only
candles = generate_candles(1000) candles = generate_candles(1000)
await manager.database.put("binance", "BTC/USDT", "5m", candles) loop.run_until_complete(manager.database.put("binance", "BTC/USDT", "5m", candles))
request = CandleRequest( request = CandleRequest(
exchange="binance", exchange="binance",
symbol="BTC/USDT", symbol="BTC/USDT",
timeframe="5m", timeframe="5m",
start=candles[0].time, start=candles[0].time,
end=candles[-1].time, end=candles[-1].time,
)
def get_candles():
# Clear memory cache to force database read
manager.clear_memory()
return asyncio.get_event_loop().run_until_complete(
manager.get_candles(request)
) )
result = benchmark(get_candles) def get_candles():
assert len(result) > 0 # Clear memory cache to force database read
manager.clear_memory()
return loop.run_until_complete(
manager.get_candles(request)
)
result = benchmark(get_candles)
assert len(result) > 0
finally:
loop.close()
# ============================================================================= # =============================================================================
@ -366,45 +390,53 @@ class TestConnectionPoolBenchmarks:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "test.db") yield os.path.join(tmpdir, "test.db")
@pytest.mark.asyncio def test_concurrent_reads(self, benchmark, temp_db_path):
async def test_concurrent_reads(self, benchmark, temp_db_path):
"""Benchmark concurrent database reads with connection pool.""" """Benchmark concurrent database reads with connection pool."""
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) loop = asyncio.new_event_loop()
await db.initialize() asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5)
loop.run_until_complete(db.initialize())
# Pre-populate # Pre-populate
candles = generate_candles(1000) candles = generate_candles(1000)
await db.put("binance", "BTC/USDT", "5m", candles) loop.run_until_complete(db.put("binance", "BTC/USDT", "5m", candles))
async def concurrent_reads(): async def concurrent_reads():
tasks = [ tasks = [
db.get("binance", "BTC/USDT", "5m", limit=100) db.get("binance", "BTC/USDT", "5m", limit=100)
for _ in range(10) for _ in range(10)
] ]
return await asyncio.gather(*tasks) return await asyncio.gather(*tasks)
def run_concurrent(): def run_concurrent():
return asyncio.get_event_loop().run_until_complete(concurrent_reads()) return loop.run_until_complete(concurrent_reads())
results = benchmark(run_concurrent) results = benchmark(run_concurrent)
assert len(results) == 10 assert len(results) == 10
await db.close() loop.run_until_complete(db.close())
finally:
loop.close()
@pytest.mark.asyncio def test_concurrent_writes(self, benchmark, temp_db_path):
async def test_concurrent_writes(self, benchmark, temp_db_path):
"""Benchmark concurrent database writes with connection pool.""" """Benchmark concurrent database writes with connection pool."""
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5) loop = asyncio.new_event_loop()
await db.initialize() asyncio.set_event_loop(loop)
try:
db = AsyncDatabaseCache(db_path=temp_db_path, pool_size=5)
loop.run_until_complete(db.initialize())
async def concurrent_writes(): async def concurrent_writes():
tasks = [] tasks = []
for i in range(10): for i in range(10):
candles = generate_candles(100, start_time=1700000000 + i * 1000000) candles = generate_candles(100, start_time=1700000000 + i * 1000000)
tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles)) tasks.append(db.put("binance", f"SYM{i}/USDT", "5m", candles))
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
def run_concurrent(): def run_concurrent():
asyncio.get_event_loop().run_until_complete(concurrent_writes()) loop.run_until_complete(concurrent_writes())
benchmark(run_concurrent) benchmark(run_concurrent)
await db.close() loop.run_until_complete(db.close())
finally:
loop.close()

View File

@ -76,7 +76,7 @@ class TestCacheManagerColdCache:
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("exchange_data_manager.cache.memory.time.time") @patch("exchange_data_manager.cache.memory.time.time")
async def test_limit_only_caches_result(self, mock_time, cache_manager): async def test_limit_only_caches_result(self, mock_time, cache_manager):
"""Test that limit-only results are cached in memory.""" """Test that limit-only results are cached in memory when request is satisfied."""
# Mock time to be close to test candle timestamps (prevents freshness check) # Mock time to be close to test candle timestamps (prevents freshness check)
mock_time.return_value = 1709337660 # Just after the candle mock_time.return_value = 1709337660 # Just after the candle
@ -87,13 +87,15 @@ class TestCacheManagerColdCache:
mock_connector.fetch_candles.return_value = expected_candles mock_connector.fetch_candles.return_value = expected_candles
cache_manager.register_exchange("binance", mock_connector) cache_manager.register_exchange("binance", mock_connector)
# Request exactly 1 candle - matches what memory will have after first call
# (If we requested more, the cache manager would refetch since memory has fewer)
request = CandleRequest( request = CandleRequest(
exchange="binance", exchange="binance",
symbol="BTC/USDT", symbol="BTC/USDT",
timeframe="1m", timeframe="1m",
start=None, start=None,
end=None, end=None,
limit=100, limit=1,
) )
# First call - fetches from exchange # First call - fetches from exchange
@ -101,7 +103,7 @@ class TestCacheManagerColdCache:
assert mock_connector.fetch_candles.call_count == 1 assert mock_connector.fetch_candles.call_count == 1
assert len(result1) == 1 assert len(result1) == 1
# Second call - memory cache has data, so doesn't need to fetch again # Second call - memory cache has enough data (1 >= 1), uses cache
result2 = await cache_manager.get_candles(request) result2 = await cache_manager.get_candles(request)
assert mock_connector.fetch_candles.call_count == 1 # Still 1 - used cache assert mock_connector.fetch_candles.call_count == 1 # Still 1 - used cache
assert len(result2) == 1 assert len(result2) == 1

View File

@ -118,11 +118,10 @@ class TestConfigLoad:
enabled = config.get_enabled_exchanges() enabled = config.get_enabled_exchanges()
assert isinstance(enabled, list) assert isinstance(enabled, list)
# Binance should be enabled per config.yaml # Major exchanges should be enabled per config.yaml
assert "binance" in enabled assert "binance" in enabled
# kucoin/kraken disabled in config.yaml assert "kucoin" in enabled
assert "kucoin" not in enabled assert "kraken" in enabled
assert "kraken" not in enabled
class TestConnectorRegistry: class TestConnectorRegistry:

View File

@ -86,7 +86,7 @@ class TestCacheManagerIntegration:
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("exchange_data_manager.cache.memory.time.time") @patch("exchange_data_manager.cache.memory.time.time")
async def test_warm_cache_uses_memory(self, mock_time, cache_manager): async def test_warm_cache_uses_memory(self, mock_time, cache_manager):
"""Test that warm cache uses memory instead of exchange.""" """Test that warm cache uses memory instead of exchange when limit is satisfied."""
# Mock time to be close to test candle timestamps (prevents freshness check) # Mock time to be close to test candle timestamps (prevents freshness check)
mock_time.return_value = 1709337720 # Just after the second candle mock_time.return_value = 1709337720 # Just after the second candle
@ -97,18 +97,20 @@ class TestCacheManagerIntegration:
] ]
cache_manager.register_exchange("binance", mock_connector) cache_manager.register_exchange("binance", mock_connector)
# Request exactly 2 candles - matches what memory will have after first call
# (If we requested more, the cache manager would refetch since memory has fewer)
request = CandleRequest( request = CandleRequest(
exchange="binance", exchange="binance",
symbol="BTC/USDT", symbol="BTC/USDT",
timeframe="1m", timeframe="1m",
limit=100, limit=2,
) )
# First call - cold cache # First call - cold cache
await cache_manager.get_candles(request) await cache_manager.get_candles(request)
assert mock_connector.fetch_candles.call_count == 1 assert mock_connector.fetch_candles.call_count == 1
# Second call - should use memory # Second call - should use memory (has 2 candles, requested 2)
result = await cache_manager.get_candles(request) result = await cache_manager.get_candles(request)
assert len(result) == 2 assert len(result) == 2
assert mock_connector.fetch_candles.call_count == 1 # No additional call assert mock_connector.fetch_candles.call_count == 1 # No additional call