30 lines
818 B
Python
30 lines
818 B
Python
"""In-memory rate limiting for the registry API."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Tuple
|
|
|
|
|
|
@dataclass
|
|
class RateLimitState:
|
|
reset_at: float
|
|
count: int
|
|
|
|
|
|
class RateLimiter:
|
|
def __init__(self) -> None:
|
|
self._state: Dict[Tuple[str, str], RateLimitState] = {}
|
|
|
|
def check(self, scope_key: str, limit: int, window_seconds: int) -> Tuple[bool, RateLimitState]:
|
|
now = time.time()
|
|
key = (scope_key, str(window_seconds))
|
|
state = self._state.get(key)
|
|
if not state or now >= state.reset_at:
|
|
state = RateLimitState(reset_at=now + window_seconds, count=0)
|
|
self._state[key] = state
|
|
state.count += 1
|
|
allowed = state.count <= limit
|
|
return allowed, state
|