exchange-data-manager/src/exchange_data_manager/api/rest.py

585 lines
18 KiB
Python

"""
REST API using FastAPI with WebSocket support for streaming.
"""
import logging
import time
from typing import Optional, List
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from ..config import Config
from ..cache.manager import CacheManager
from ..candles.models import CandleRequest, Candle
from ..exchanges import CONNECTOR_REGISTRY, CCXTConnector
from ..sessions import SessionManager
from ..monitoring import HealthChecker, MetricsCollector, CacheSource, RequestMetrics
from .websocket import ws_manager
logger = logging.getLogger(__name__)
# Global instances
config: Optional[Config] = None
cache_manager: Optional[CacheManager] = None
session_manager: Optional[SessionManager] = None
health_checker: Optional[HealthChecker] = None
metrics_collector: Optional[MetricsCollector] = None
class CandleResponse(BaseModel):
"""Response model for candle data."""
exchange: str
symbol: str
timeframe: str
candles: List[dict]
count: int
class HealthResponse(BaseModel):
"""Health check response."""
status: str
version: str
class StatsResponse(BaseModel):
"""Cache statistics response."""
memory: dict
database: dict
registered_exchanges: List[str]
class CreateSessionResponse(BaseModel):
"""Response for session creation."""
session_id: str
expires_at: str
class AddCredentialsRequest(BaseModel):
"""Request to add exchange credentials to a session."""
exchange: str
api_key: str
api_secret: str
passphrase: Optional[str] = None
testnet: bool = False
class SessionStatsResponse(BaseModel):
"""Session manager statistics."""
total_sessions: int
active_sessions: int
expired_pending_cleanup: int
timeout_minutes: float
cleanup_interval_seconds: int
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
global config, cache_manager, session_manager, health_checker, metrics_collector
# Startup
logger.info("Starting Exchange Data Manager...")
config = Config.load()
# Initialize monitoring
health_checker = HealthChecker(version="0.1.0", db_path=config.database.path)
metrics_collector = MetricsCollector(window_size=1000)
# Create cache manager with config (uses async database by default)
cache_manager = CacheManager(
cache_config=config.cache,
database_config=config.database,
)
# Initialize async database
await cache_manager.initialize()
# Create and start session manager
session_manager = SessionManager(
session_timeout_minutes=60,
cleanup_interval_seconds=300,
)
await session_manager.start()
# Register all enabled exchanges using the generic CCXTConnector
for name, ex_config in config.exchanges.items():
if ex_config.enabled:
# Use generic CCXTConnector - works with ANY ccxt exchange
if CCXTConnector.is_exchange_supported(name):
connector = CCXTConnector(exchange_id=name, config=ex_config)
cache_manager.register_exchange(name, connector)
logger.info(f"Registered exchange: {name} (rate_limit={ex_config.rate_limit_ms}ms)")
else:
logger.warning(f"Exchange not supported by ccxt: {name}")
logger.info(
f"Exchange Data Manager started with {len(cache_manager._exchange_connectors)} exchanges"
)
yield
# Shutdown
logger.info("Shutting down Exchange Data Manager...")
# Shutdown WebSocket manager
await ws_manager.shutdown()
# Stop session manager (cleans up all sessions)
if session_manager:
await session_manager.stop()
# Close exchange connections
for name in cache_manager._exchange_connectors:
connector = cache_manager.get_exchange(name)
if connector:
await connector.close()
# Close cache resources (e.g., async DB connection pool)
if cache_manager:
await cache_manager.close()
logger.info("Exchange Data Manager stopped")
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
application = FastAPI(
title="Exchange Data Manager",
description="Cryptocurrency candle data service with intelligent caching",
version="0.1.0",
lifespan=lifespan,
)
# Add CORS middleware
# Note: allow_credentials=True is invalid with allow_origins=["*"]
# Using False for public API; configure specific origins if credentials needed
application.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
return application
app = create_app()
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""
Quick health check endpoint.
Returns minimal health info for load balancer checks.
For detailed component health, use /health/detailed.
"""
if health_checker is None:
return HealthResponse(status="starting", version="0.1.0")
return HealthResponse(**health_checker.quick_check())
@app.get("/health/detailed")
async def health_check_detailed():
"""
Detailed health check endpoint.
Checks all components: database, memory cache, exchanges, sessions.
Returns component-level health status with latency measurements.
"""
if health_checker is None:
raise HTTPException(status_code=503, detail="Service not initialized")
health = await health_checker.check_all(
cache_manager=cache_manager,
session_manager=session_manager,
db_path=config.database.path if config else None,
)
return health.to_dict()
@app.get("/metrics")
async def get_metrics():
"""
Get request metrics.
Returns cache hit rates, latency statistics, and request rates.
"""
if metrics_collector is None:
raise HTTPException(status_code=503, detail="Service not initialized")
return metrics_collector.get_summary()
@app.get("/stats", response_model=StatsResponse)
async def get_stats():
"""Get cache statistics."""
if cache_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
stats = await cache_manager.stats()
return StatsResponse(**stats)
@app.get("/exchanges")
async def list_exchanges():
"""List available exchanges."""
if cache_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
exchanges = []
for name in cache_manager._exchange_connectors:
connector = cache_manager.get_exchange(name)
if connector:
exchanges.append({
"name": name,
"timeframes": connector.get_timeframes(),
})
return {"exchanges": exchanges}
@app.get("/exchanges/{exchange}/symbols")
async def get_symbols(exchange: str):
"""Get available symbols for an exchange."""
if cache_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
connector = cache_manager.get_exchange(exchange)
if not connector:
raise HTTPException(status_code=404, detail=f"Exchange not found: {exchange}")
symbols = await connector.get_symbols()
return {"exchange": exchange, "symbols": symbols, "count": len(symbols)}
@app.get("/exchanges/{exchange}/timeframes")
async def get_timeframes(exchange: str):
"""Get supported timeframes for an exchange."""
if cache_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
connector = cache_manager.get_exchange(exchange)
if not connector:
raise HTTPException(status_code=404, detail=f"Exchange not found: {exchange}")
return {"exchange": exchange, "timeframes": connector.get_timeframes()}
@app.get("/candles", response_model=CandleResponse)
async def get_candles(
exchange: str = Query(..., description="Exchange name (e.g., binance)"),
symbol: str = Query(..., description="Trading pair (e.g., BTC/USDT)"),
timeframe: str = Query(..., description="Candle timeframe (e.g., 5m, 1h)"),
start: Optional[int] = Query(None, description="Start timestamp (Unix seconds)"),
end: Optional[int] = Query(None, description="End timestamp (Unix seconds)"),
limit: Optional[int] = Query(None, description="Maximum candles to return (omit for all)", le=10000),
session_id: Optional[str] = Query(
None,
description="Optional session ID for per-session exchange credentials",
),
):
"""
Get historical candle data.
Uses three-tier caching: memory → database → exchange.
"""
if cache_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
request = CandleRequest(
exchange=exchange,
symbol=symbol,
timeframe=timeframe,
start=start,
end=end,
limit=limit,
)
request_start = time.perf_counter()
connector_override = None
try:
if session_id is not None:
if session_manager is None:
raise HTTPException(status_code=503, detail="Session manager not initialized")
connector_override = session_manager.get_session_connector(
session_id=session_id,
exchange=request.exchange,
)
if connector_override is None:
raise HTTPException(
status_code=404,
detail=f"No active session connector for exchange '{request.exchange}'",
)
candles, source_name = await cache_manager.get_candles_with_source(
request,
connector_override=connector_override,
)
if metrics_collector is not None:
try:
source = CacheSource(source_name)
except ValueError:
source = CacheSource.EXCHANGE
metrics_collector.record_candle_request(
latency_ms=(time.perf_counter() - request_start) * 1000,
status_code=200,
cache_source=source,
exchange=request.exchange,
symbol=request.symbol,
timeframe=request.timeframe,
candle_count=len(candles),
)
return CandleResponse(
exchange=request.exchange,
symbol=request.symbol,
timeframe=request.timeframe,
candles=[c.to_dict() for c in candles],
count=len(candles),
)
except HTTPException as e:
if metrics_collector is not None:
metrics_collector.record(
RequestMetrics(
endpoint="/candles",
method="GET",
status_code=e.status_code,
latency_ms=(time.perf_counter() - request_start) * 1000,
exchange=request.exchange,
symbol=request.symbol,
timeframe=request.timeframe,
)
)
raise
except Exception as e:
if metrics_collector is not None:
metrics_collector.record(
RequestMetrics(
endpoint="/candles",
method="GET",
status_code=500,
latency_ms=(time.perf_counter() - request_start) * 1000,
exchange=request.exchange,
symbol=request.symbol,
timeframe=request.timeframe,
)
)
logger.error(f"Error fetching candles: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/cache")
async def clear_cache(
exchange: Optional[str] = Query(None),
symbol: Optional[str] = Query(None),
timeframe: Optional[str] = Query(None),
memory_only: bool = Query(False),
):
"""Clear cache data."""
if cache_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
cache_manager.clear_memory()
if not memory_only:
await cache_manager.clear_database(exchange, symbol, timeframe)
return {"status": "cleared"}
# =============================================================================
# Session Endpoints
# =============================================================================
@app.post("/sessions", response_model=CreateSessionResponse)
async def create_session():
"""
Create a new session for authenticated exchange access.
Sessions are ephemeral - credentials exist only while the session is active.
Sessions expire after 60 minutes of inactivity.
"""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
session = session_manager.create_session()
return CreateSessionResponse(
session_id=session.id,
expires_at=session.expires_at.isoformat() if session.expires_at else "",
)
@app.post("/sessions/{session_id}/credentials")
async def add_credentials(session_id: str, request: AddCredentialsRequest):
"""
Add exchange credentials to a session.
Credentials are stored in memory only and cleaned up when:
- The session is explicitly destroyed
- The session expires
- The server shuts down
"""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
success = await session_manager.add_exchange_credentials(
session_id=session_id,
exchange=request.exchange,
api_key=request.api_key,
api_secret=request.api_secret,
passphrase=request.passphrase,
testnet=request.testnet,
)
if not success:
raise HTTPException(status_code=404, detail="Session not found or expired")
return {"status": "credentials_added", "exchange": request.exchange}
@app.delete("/sessions/{session_id}/credentials/{exchange}")
async def remove_credentials(session_id: str, exchange: str):
"""Remove exchange credentials from a session."""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
success = await session_manager.remove_exchange_credentials(session_id, exchange)
if not success:
raise HTTPException(status_code=404, detail="Session or exchange not found")
return {"status": "credentials_removed", "exchange": exchange}
@app.delete("/sessions/{session_id}")
async def destroy_session(session_id: str):
"""
Destroy a session and clean up all credentials.
This immediately removes all credentials and closes any open connections.
"""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
destroyed = await session_manager.destroy_session(session_id)
if not destroyed:
raise HTTPException(status_code=404, detail="Session not found")
return {"status": "session_destroyed"}
@app.post("/sessions/{session_id}/refresh")
async def refresh_session(session_id: str):
"""
Extend session expiration.
Call this periodically to keep the session alive.
"""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
if not session_manager.refresh_session(session_id):
raise HTTPException(status_code=404, detail="Session not found or expired")
session = session_manager.get_session(session_id)
expires_at = session.expires_at.isoformat() if session and session.expires_at else ""
return {"status": "session_refreshed", "expires_at": expires_at}
@app.get("/sessions/{session_id}")
async def get_session_info(session_id: str):
"""Get information about a session."""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
session = session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found or expired")
return {
"session_id": session.id,
"created_at": session.created_at.isoformat(),
"expires_at": session.expires_at.isoformat() if session.expires_at else None,
"exchanges": session.exchanges,
}
@app.get("/sessions", response_model=SessionStatsResponse)
async def get_session_stats():
"""Get session manager statistics."""
if session_manager is None:
raise HTTPException(status_code=503, detail="Service not initialized")
return SessionStatsResponse(**session_manager.stats())
# =============================================================================
# WebSocket Endpoint for Streaming
# =============================================================================
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for streaming candle data.
Connect and send JSON messages to subscribe/unsubscribe:
Subscribe:
{"action": "subscribe", "exchange": "binance", "symbol": "BTC/USDT", "timeframe": "1m"}
Unsubscribe:
{"action": "unsubscribe", "exchange": "binance", "symbol": "BTC/USDT", "timeframe": "1m"}
Ping (keep-alive):
{"action": "ping"}
Responses:
{"action": "subscribed", "stream": "binance:BTC/USDT:1m", ...}
{"action": "candle", "stream": "binance:BTC/USDT:1m", "data": {...}}
{"action": "pong"}
{"error": "..."}
"""
await ws_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await ws_manager.handle_message(websocket, data)
except WebSocketDisconnect:
await ws_manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {e}")
await ws_manager.disconnect(websocket)
@app.get("/ws/stats")
async def websocket_stats():
"""Get WebSocket connection statistics."""
return {
"connected_clients": len(ws_manager._clients),
"active_streams": list(ws_manager._active_streams.keys()),
"total_subscriptions": sum(
len(c.subscriptions) for c in ws_manager._clients.values()
),
}