Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Ограничения API (Rate Limiting, Throttling)
Ограничения API — это механизмы для контроля количества запросов и защиты от перегрузки. Это критичный компонент любого production-ready API:
1. Rate Limiting (ограничение частоты)
Ограничивает количество запросов за определённый период времени.
from fastapi import FastAPI, HTTPException, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import time
# Использование slowapi для FastAPI
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
# Декоратор rate limiting
@app.get("/api/v1/users")
@limiter.limit("10/minute") # 10 запросов в минуту
async def get_users(request: Request):
return {"users": []}
# Разные лимиты для разных endpoints
@app.get("/api/v1/users/{user_id}")
@limiter.limit("100/minute")
async def get_user(user_id: int, request: Request):
return {"id": user_id, "name": "John"}
@app.post("/api/v1/login")
@limiter.limit("5/minute") # Строгий лимит для login
async def login(request: Request):
return {"token": "abc123"}
# Custom error handler
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return {
"error": "Rate limit exceeded",
"message": exc.detail,
"retry_after": 60
}
Алгоритмы rate limiting:
Token Bucket Algorithm
from collections import defaultdict
from datetime import datetime, timedelta
class TokenBucket:
def __init__(self, capacity: int, refill_rate: int):
"""
capacity: максимум токенов
refill_rate: токенов в секунду
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = datetime.now()
def allow_request(self) -> bool:
now = datetime.now()
elapsed = (now - self.last_refill).total_seconds()
# Пополняем токены
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
# Проверяем наличие токена
if self.tokens >= 1:
self.tokens -= 1
return True
return False
# Использование
buckets = defaultdict(lambda: TokenBucket(capacity=10, refill_rate=2))
def check_rate_limit(client_id: str) -> bool:
return buckets[client_id].allow_request()
# Тестирование
for i in range(15):
client_id = "user-123"
if check_rate_limit(client_id):
print(f"Request {i+1}: ALLOWED")
else:
print(f"Request {i+1}: DENIED")
Sliding Window Counter
from collections import defaultdict, deque
from time import time
class SlidingWindowCounter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(deque)
def allow_request(self, client_id: str) -> bool:
now = time()
window_start = now - self.window_seconds
# Удаляем старые запросы
while self.requests[client_id] and self.requests[client_id][0] < window_start:
self.requests[client_id].popleft()
# Проверяем лимит
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(now)
return True
return False
# Использование
limiter = SlidingWindowCounter(max_requests=100, window_seconds=60)
for i in range(105):
if limiter.allow_request("user-456"):
print(f"Request {i+1}: ALLOWED")
else:
print(f"Request {i+1}: DENIED")
2. Throttling (дросселирование)
Более мягкий вариант, который замедляет обработку вместо отказа.
import asyncio
from datetime import datetime, timedelta
class AdaptiveThrottler:
def __init__(self, max_requests_per_second: float = 10):
self.max_requests = max_requests_per_second
self.min_interval = 1 / max_requests_per_second
self.last_request_time = 0
async def throttle(self):
"""Ждёт, если нужно, чтобы соблюдать лимит"""
now = datetime.now().timestamp()
elapsed = now - self.last_request_time
if elapsed < self.min_interval:
wait_time = self.min_interval - elapsed
await asyncio.sleep(wait_time)
self.last_request_time = datetime.now().timestamp()
# Использование
throttler = AdaptiveThrottler(max_requests_per_second=5)
async def throttled_requests():
for i in range(10):
await throttler.throttle()
print(f"Request {i+1}")
3. Response Headers
API должен возвращать информацию о лимитах в headers.
from fastapi import FastAPI, Response
app = FastAPI()
@app.get("/api/v1/data")
async def get_data(response: Response):
# Информация о лимитах в response headers
response.headers["X-RateLimit-Limit"] = "100" # Лимит
response.headers["X-RateLimit-Remaining"] = "47" # Осталось
response.headers["X-RateLimit-Reset"] = "1640000000" # Когда перезагрузится
response.headers["Retry-After"] = "60" # Сколько ждать при превышении
return {
"data": "some data",
"rate_limit": {
"limit": 100,
"remaining": 47,
"reset": 1640000000
}
}
4. API Keys & Authentication
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Optional
app = FastAPI()
# Простая authentication через API key
async def verify_api_key(x_api_key: str = Header()):
valid_keys = ["key-123", "key-456"]
if x_api_key not in valid_keys:
raise HTTPException(
status_code=403,
detail="Invalid API key"
)
return x_api_key
# Использование
@app.get("/api/v1/protected")
async def protected_endpoint(api_key: str = Depends(verify_api_key)):
return {"message": "You have access"}
# OAuth 2.0 (более сложный, но более безопасный)
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
# Проверяем токен
if not verify_token(token):
raise HTTPException(
status_code=401,
detail="Invalid token"
)
return {"username": "user"}
def verify_token(token: str) -> bool:
# Реальная проверка токена
return True
5. Quotas (квоты)
Ограничение по объёму данных или операциям.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
class PlanType(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class UserQuota:
plan: PlanType
requests_per_day: int
api_calls_used: int = 0
last_reset: datetime = field(default_factory=datetime.now)
def reset_if_needed(self):
if datetime.now() - self.last_reset > timedelta(days=1):
self.api_calls_used = 0
self.last_reset = datetime.now()
def can_make_request(self) -> bool:
self.reset_if_needed()
return self.api_calls_used < self.requests_per_day
def record_request(self):
self.api_calls_used += 1
# Quotas для разных планов
QUOTAS = {
PlanType.FREE: 100, # 100 запросов в день
PlanType.PRO: 10000, # 10,000 запросов в день
PlanType.ENTERPRISE: float('inf') # Неограниченно
}
# Использование
def check_user_quota(user_id: str, plan: PlanType) -> bool:
quota = UserQuota(
plan=plan,
requests_per_day=QUOTAS[plan]
)
if quota.can_make_request():
quota.record_request()
return True
return False
6. Backoff & Retry Strategies
Как клиент должен повторять запросы при ошибке.
import requests
from typing import Optional
import random
import time
class ExponentialBackoff:
def __init__(self, base_delay: float = 1, max_retries: int = 5):
self.base_delay = base_delay
self.max_retries = max_retries
def retry(self, func, *args, **kwargs):
"""Повторяет функцию с экспоненциальной задержкой"""
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except requests.RequestException as e:
last_exception = e
# Экспоненциальная задержка + jitter
delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, 0.1 * delay)
wait_time = delay + jitter
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time:.2f}s")
time.sleep(wait_time)
raise last_exception
# Использование
backoff = ExponentialBackoff(base_delay=1, max_retries=3)
def make_api_call():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()
data = backoff.retry(make_api_call)
7. Практический пример: Complete API client
import httpx
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset_at: datetime
class APIClient:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.rate_limit: Optional[RateLimitInfo] = None
async def request(self, method: str, path: str, **kwargs) -> dict:
"""Make API request with rate limit awareness"""
# Проверяем rate limit
if self.rate_limit and self.rate_limit.remaining == 0:
wait_time = (self.rate_limit.reset_at - datetime.now()).total_seconds()
if wait_time > 0:
print(f"Rate limit exceeded. Waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
headers = {
"Authorization": f"Bearer {self.api_key}",
**kwargs.get("headers", {})
}
async with httpx.AsyncClient() as client:
response = await client.request(
method,
f"{self.base_url}{path}",
headers=headers,
**kwargs
)
# Обновляем информацию о rate limit
self.rate_limit = RateLimitInfo(
limit=int(response.headers.get("X-RateLimit-Limit", 0)),
remaining=int(response.headers.get("X-RateLimit-Remaining", 0)),
reset_at=datetime.fromtimestamp(
int(response.headers.get("X-RateLimit-Reset", 0))
)
)
response.raise_for_status()
return response.json()
async def get(self, path: str, **kwargs):
return await self.request("GET", path, **kwargs)
async def post(self, path: str, **kwargs):
return await self.request("POST", path, **kwargs)
Ключевые концепции
- Rate Limiting: ограничение количества запросов
- Throttling: замедление обработки
- Quotas: лимиты на объём данных/операции
- Backoff: экспоненциальная задержка при ошибке
- API Keys: идентификация клиентов
- Response Headers: информирование клиента о лимитах