exchange-data-manager/src/exchange_data_manager/cache/manager.py

444 lines
16 KiB
Python

"""
Cache Manager - orchestrates the memory → database → exchange data flow.
Implements three-tier caching with:
- Mode-aware completeness validation (Phase 6)
- Gap detection and filling (Phase 7)
"""
import logging
from typing import List, Optional, Tuple, Dict, Union, TYPE_CHECKING
from ..candles.models import Candle, CandleRequest
from ..config import CacheConfig, DatabaseConfig
from .memory import MemoryCache
from .database import DatabaseCache
from .async_database import AsyncDatabaseCache
from .completeness import check_completeness
from .gap_filler import fill_gaps
if TYPE_CHECKING:
from ..exchanges.base import BaseExchangeConnector
logger = logging.getLogger(__name__)
class CacheManager:
"""
Central cache manager implementing the three-tier caching strategy.
Data flow:
1. Check memory cache
2. For missing ranges, check database (async)
3. For still-missing ranges, fetch from exchange
4. Store fetched data in database
5. Store all data in memory
6. Return complete dataset
"""
def __init__(
self,
cache_config: Optional[CacheConfig] = None,
database_config: Optional[DatabaseConfig] = None,
memory_cache: Optional[MemoryCache] = None,
database_cache: Optional[Union[DatabaseCache, AsyncDatabaseCache]] = None,
use_async_db: bool = True,
):
"""
Initialize cache manager.
Args:
cache_config: Cache configuration (uses defaults if None)
database_config: Database configuration (uses defaults if None)
memory_cache: Memory cache instance (created from config if None)
database_cache: Database cache instance (created from config if None)
use_async_db: Use async database (default True). Set False for sync-only contexts.
"""
self._cache_config = cache_config or CacheConfig()
self._database_config = database_config or DatabaseConfig()
# Create caches from config or use provided instances
self.memory = memory_cache or MemoryCache(
max_candles=self._cache_config.memory_max_candles,
ttl_seconds=self._cache_config.memory_ttl_seconds,
)
# Use async database by default for better event loop compatibility
if database_cache is not None:
self.database = database_cache
self._use_async_db = isinstance(database_cache, AsyncDatabaseCache)
elif use_async_db:
self.database = AsyncDatabaseCache(
db_path=self._database_config.path,
pool_size=self._database_config.pool_size,
max_overflow=self._database_config.max_overflow,
)
self._use_async_db = True
else:
self.database = DatabaseCache(db_path=self._database_config.path)
self._use_async_db = False
self._exchange_connectors: Dict[str, "BaseExchangeConnector"] = {}
logger.info(
f"CacheManager initialized: memory_max={self._cache_config.memory_max_candles}, "
f"ttl={self._cache_config.memory_ttl_seconds}s, db={self._database_config.path}, "
f"async_db={self._use_async_db}"
)
async def initialize(self):
"""Initialize async resources (call before using async methods)."""
if self._use_async_db and hasattr(self.database, "initialize"):
await self.database.initialize()
def register_exchange(self, name: str, connector: "BaseExchangeConnector"):
"""Register an exchange connector for data fetching."""
self._exchange_connectors[name.lower()] = connector
logger.info(f"Registered exchange connector: {name}")
def get_exchange(self, name: str) -> Optional["BaseExchangeConnector"]:
"""Get a registered exchange connector."""
return self._exchange_connectors.get(name.lower())
async def get_candles(self, request: CandleRequest) -> List[Candle]:
"""
Get candles using the three-tier cache strategy.
Args:
request: Candle request parameters
Returns:
List of candles (sorted by time ascending)
"""
candles, _ = await self.get_candles_with_source(request)
return candles
async def get_candles_with_source(
self,
request: CandleRequest,
connector_override: Optional["BaseExchangeConnector"] = None,
) -> Tuple[List[Candle], str]:
"""
Get candles and identify which source satisfied the request.
Args:
request: Candle request parameters
connector_override: Optional per-request connector (session-scoped)
Returns:
Tuple of (candles, source) where source is one of:
"memory", "database", "exchange"
"""
cache_key = request.cache_key
all_candles: List[Candle] = []
# Step 1: Check memory cache
logger.debug(f"Checking memory cache for {cache_key}")
memory_candles, memory_gaps = self.memory.get(
cache_key, request.start, request.end, request.limit
)
if memory_candles and not memory_gaps:
# Complete data in memory
logger.debug(f"Memory cache hit: {len(memory_candles)} candles")
return memory_candles, "memory"
all_candles.extend(memory_candles)
gaps_to_fill = memory_gaps if memory_gaps else [(request.start, request.end)]
# Step 2: Handle cold-cache / limit-only requests
# When start and end are both None, fetch directly from exchange
if not gaps_to_fill or gaps_to_fill == [(None, None)]:
exchange_candles = await self._fetch_limit_only(
request,
connector_override=connector_override,
)
if exchange_candles:
all_candles.extend(exchange_candles)
# Sort, dedupe, and return
by_time = {c.time: c for c in all_candles}
result = sorted(by_time.values(), key=lambda c: c.time)
# Apply gap filling if configured
if self._cache_config.fill_gaps and result:
result = fill_gaps(
result,
request.timeframe,
copy_volume=self._cache_config.forward_fill_volume,
)
if request.limit and len(result) > request.limit:
result = result[-request.limit:]
source = "exchange" if exchange_candles else "memory"
return result, source
db_hit = False
exchange_hit = False
# Step 3: Check database for missing ranges
for gap_start, gap_end in gaps_to_fill:
if gap_start is None or gap_end is None:
continue
logger.debug(f"Checking database for gap: {gap_start} to {gap_end}")
# Handle both sync and async database
if self._use_async_db:
db_candles, db_gaps = await self.database.get(
request.exchange,
request.symbol,
request.timeframe,
gap_start,
gap_end,
)
else:
db_candles, db_gaps = self.database.get(
request.exchange,
request.symbol,
request.timeframe,
gap_start,
gap_end,
)
if db_candles:
logger.debug(f"Database hit: {len(db_candles)} candles")
all_candles.extend(db_candles)
db_hit = True
# Store in memory for future requests
self.memory.put(cache_key, db_candles)
# Step 3: Fetch remaining gaps from exchange
if db_gaps:
exchange_candles = await self._fetch_from_exchange(
request,
db_gaps,
connector_override=connector_override,
)
if exchange_candles:
logger.debug(f"Exchange fetch: {len(exchange_candles)} candles")
all_candles.extend(exchange_candles)
exchange_hit = True
# Sort and deduplicate
by_time = {c.time: c for c in all_candles}
result = sorted(by_time.values(), key=lambda c: c.time)
# Apply gap filling if configured
if self._cache_config.fill_gaps and result:
result = fill_gaps(
result,
request.timeframe,
copy_volume=self._cache_config.forward_fill_volume,
)
# Validate completeness and log if incomplete
completeness = check_completeness(
result,
request,
time_tolerance_ms=self._cache_config.time_tolerance_seconds * 1000,
count_tolerance=self._cache_config.count_tolerance,
)
if not completeness.is_complete:
logger.warning(
f"Incomplete data for {request.cache_key}: {completeness.reason} "
f"(expected={completeness.expected_count}, actual={completeness.actual_count})"
)
# Apply limit if specified
if request.limit and len(result) > request.limit:
result = result[-request.limit:]
source = "memory"
if exchange_hit:
source = "exchange"
elif db_hit:
source = "database"
elif memory_candles:
source = "memory"
return result, source
async def _fetch_limit_only(
self,
request: CandleRequest,
connector_override: Optional["BaseExchangeConnector"] = None,
) -> List[Candle]:
"""
Fetch candles when only limit is specified (no start/end).
This handles cold-cache scenarios where we just want the most recent N candles.
Args:
request: Candle request with limit but no start/end
Returns:
List of fetched candles
"""
connector = connector_override or self._exchange_connectors.get(request.exchange)
if not connector:
logger.warning(f"No connector registered for exchange: {request.exchange}")
return []
try:
limit = request.limit or 100
logger.info(
f"Fetching latest {limit} candles from {request.exchange}: "
f"{request.symbol} {request.timeframe}"
)
# Fetch without start/end - exchange returns most recent candles
candles = await connector.fetch_candles(
symbol=request.symbol,
timeframe=request.timeframe,
limit=limit,
)
if candles:
# Store in database for persistence
if self._use_async_db:
await self.database.put(
request.exchange,
request.symbol,
request.timeframe,
candles,
)
else:
self.database.put(
request.exchange,
request.symbol,
request.timeframe,
candles,
)
# Store in memory
self.memory.put(request.cache_key, candles)
return candles
except Exception as e:
logger.error(f"Error fetching from exchange: {e}")
return []
async def _fetch_from_exchange(
self,
request: CandleRequest,
gaps: List[Tuple[int, int]],
connector_override: Optional["BaseExchangeConnector"] = None,
) -> List[Candle]:
"""
Fetch missing candles from exchange.
Args:
request: Original candle request
gaps: List of (start, end) time ranges to fetch
Returns:
List of fetched candles
"""
connector = connector_override or self._exchange_connectors.get(request.exchange)
if not connector:
logger.warning(f"No connector registered for exchange: {request.exchange}")
return []
all_fetched: List[Candle] = []
for gap_start, gap_end in gaps:
try:
logger.info(
f"Fetching from {request.exchange}: {request.symbol} {request.timeframe} "
f"[{gap_start} to {gap_end}]"
)
candles = await connector.fetch_candles(
symbol=request.symbol,
timeframe=request.timeframe,
start=gap_start,
end=gap_end,
)
if candles:
all_fetched.extend(candles)
# Store in database for persistence
if self._use_async_db:
await self.database.put(
request.exchange,
request.symbol,
request.timeframe,
candles,
)
else:
self.database.put(
request.exchange,
request.symbol,
request.timeframe,
candles,
)
# Store in memory
self.memory.put(request.cache_key, candles)
except Exception as e:
logger.error(f"Error fetching from exchange: {e}")
return all_fetched
async def update_candle(self, exchange: str, symbol: str, timeframe: str, candle: Candle):
"""
Update a single candle (for real-time updates).
Used by WebSocket handlers to update in-progress candles.
Args:
exchange: Exchange name
symbol: Trading pair symbol
timeframe: Candle timeframe
candle: Updated candle data
"""
cache_key = f"{exchange.lower()}:{symbol.upper()}:{timeframe.lower()}"
# Update memory cache
self.memory.update_candle(cache_key, candle)
# Only persist closed candles to database
if candle.closed:
if self._use_async_db:
await self.database.put(exchange, symbol, timeframe, [candle])
else:
self.database.put(exchange, symbol, timeframe, [candle])
def clear_memory(self):
"""Clear memory cache."""
self.memory.clear()
logger.info("Memory cache cleared")
async def clear_database(
self,
exchange: Optional[str] = None,
symbol: Optional[str] = None,
timeframe: Optional[str] = None,
):
"""Clear database cache (optionally filtered)."""
if self._use_async_db:
await self.database.delete(exchange, symbol, timeframe)
else:
self.database.delete(exchange, symbol, timeframe)
logger.info("Database cache cleared")
async def stats(self) -> dict:
"""Get combined cache statistics."""
if self._use_async_db:
db_stats = await self.database.stats()
else:
db_stats = self.database.stats()
return {
"memory": self.memory.stats(),
"database": db_stats,
"registered_exchanges": list(self._exchange_connectors.keys()),
}
async def close(self):
"""Close async resources."""
if self._use_async_db and hasattr(self.database, "close"):
await self.database.close()