← Назад к вопросам

Какие знаешь ограничения API?

2.3 Middle🔥 131 комментариев
#REST API и HTTP

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ограничения API (Rate Limiting, Throttling)

Ограничения API — это механизмы для контроля количества запросов и защиты от перегрузки. Это критичный компонент любого production-ready API:

1. Rate Limiting (ограничение частоты)

Ограничивает количество запросов за определённый период времени.

from fastapi import FastAPI, HTTPException, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import time

# Использование slowapi для FastAPI
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()

# Декоратор rate limiting
@app.get("/api/v1/users")
@limiter.limit("10/minute")  # 10 запросов в минуту
async def get_users(request: Request):
    return {"users": []}

# Разные лимиты для разных endpoints
@app.get("/api/v1/users/{user_id}")
@limiter.limit("100/minute")
async def get_user(user_id: int, request: Request):
    return {"id": user_id, "name": "John"}

@app.post("/api/v1/login")
@limiter.limit("5/minute")  # Строгий лимит для login
async def login(request: Request):
    return {"token": "abc123"}

# Custom error handler
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return {
        "error": "Rate limit exceeded",
        "message": exc.detail,
        "retry_after": 60
    }

Алгоритмы rate limiting:

Token Bucket Algorithm

from collections import defaultdict
from datetime import datetime, timedelta

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: int):
        """
        capacity: максимум токенов
        refill_rate: токенов в секунду
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = datetime.now()
    
    def allow_request(self) -> bool:
        now = datetime.now()
        elapsed = (now - self.last_refill).total_seconds()
        
        # Пополняем токены
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now
        
        # Проверяем наличие токена
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

# Использование
buckets = defaultdict(lambda: TokenBucket(capacity=10, refill_rate=2))

def check_rate_limit(client_id: str) -> bool:
    return buckets[client_id].allow_request()

# Тестирование
for i in range(15):
    client_id = "user-123"
    if check_rate_limit(client_id):
        print(f"Request {i+1}: ALLOWED")
    else:
        print(f"Request {i+1}: DENIED")

Sliding Window Counter

from collections import defaultdict, deque
from time import time

class SlidingWindowCounter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(deque)
    
    def allow_request(self, client_id: str) -> bool:
        now = time()
        window_start = now - self.window_seconds
        
        # Удаляем старые запросы
        while self.requests[client_id] and self.requests[client_id][0] < window_start:
            self.requests[client_id].popleft()
        
        # Проверяем лимит
        if len(self.requests[client_id]) < self.max_requests:
            self.requests[client_id].append(now)
            return True
        return False

# Использование
limiter = SlidingWindowCounter(max_requests=100, window_seconds=60)

for i in range(105):
    if limiter.allow_request("user-456"):
        print(f"Request {i+1}: ALLOWED")
    else:
        print(f"Request {i+1}: DENIED")

2. Throttling (дросселирование)

Более мягкий вариант, который замедляет обработку вместо отказа.

import asyncio
from datetime import datetime, timedelta

class AdaptiveThrottler:
    def __init__(self, max_requests_per_second: float = 10):
        self.max_requests = max_requests_per_second
        self.min_interval = 1 / max_requests_per_second
        self.last_request_time = 0
    
    async def throttle(self):
        """Ждёт, если нужно, чтобы соблюдать лимит"""
        now = datetime.now().timestamp()
        elapsed = now - self.last_request_time
        
        if elapsed < self.min_interval:
            wait_time = self.min_interval - elapsed
            await asyncio.sleep(wait_time)
        
        self.last_request_time = datetime.now().timestamp()

# Использование
throttler = AdaptiveThrottler(max_requests_per_second=5)

async def throttled_requests():
    for i in range(10):
        await throttler.throttle()
        print(f"Request {i+1}")

3. Response Headers

API должен возвращать информацию о лимитах в headers.

from fastapi import FastAPI, Response

app = FastAPI()

@app.get("/api/v1/data")
async def get_data(response: Response):
    # Информация о лимитах в response headers
    response.headers["X-RateLimit-Limit"] = "100"  # Лимит
    response.headers["X-RateLimit-Remaining"] = "47"  # Осталось
    response.headers["X-RateLimit-Reset"] = "1640000000"  # Когда перезагрузится
    response.headers["Retry-After"] = "60"  # Сколько ждать при превышении
    
    return {
        "data": "some data",
        "rate_limit": {
            "limit": 100,
            "remaining": 47,
            "reset": 1640000000
        }
    }

4. API Keys & Authentication

from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Optional

app = FastAPI()

# Простая authentication через API key
async def verify_api_key(x_api_key: str = Header()):
    valid_keys = ["key-123", "key-456"]
    if x_api_key not in valid_keys:
        raise HTTPException(
            status_code=403,
            detail="Invalid API key"
        )
    return x_api_key

# Использование
@app.get("/api/v1/protected")
async def protected_endpoint(api_key: str = Depends(verify_api_key)):
    return {"message": "You have access"}

# OAuth 2.0 (более сложный, но более безопасный)
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(token: str = Depends(oauth2_scheme)):
    # Проверяем токен
    if not verify_token(token):
        raise HTTPException(
            status_code=401,
            detail="Invalid token"
        )
    return {"username": "user"}

def verify_token(token: str) -> bool:
    # Реальная проверка токена
    return True

5. Quotas (квоты)

Ограничение по объёму данных или операциям.

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum

class PlanType(Enum):
    FREE = "free"
    PRO = "pro"
    ENTERPRISE = "enterprise"

@dataclass
class UserQuota:
    plan: PlanType
    requests_per_day: int
    api_calls_used: int = 0
    last_reset: datetime = field(default_factory=datetime.now)
    
    def reset_if_needed(self):
        if datetime.now() - self.last_reset > timedelta(days=1):
            self.api_calls_used = 0
            self.last_reset = datetime.now()
    
    def can_make_request(self) -> bool:
        self.reset_if_needed()
        return self.api_calls_used < self.requests_per_day
    
    def record_request(self):
        self.api_calls_used += 1

# Quotas для разных планов
QUOTAS = {
    PlanType.FREE: 100,          # 100 запросов в день
    PlanType.PRO: 10000,         # 10,000 запросов в день
    PlanType.ENTERPRISE: float('inf')  # Неограниченно
}

# Использование
def check_user_quota(user_id: str, plan: PlanType) -> bool:
    quota = UserQuota(
        plan=plan,
        requests_per_day=QUOTAS[plan]
    )
    
    if quota.can_make_request():
        quota.record_request()
        return True
    return False

6. Backoff & Retry Strategies

Как клиент должен повторять запросы при ошибке.

import requests
from typing import Optional
import random
import time

class ExponentialBackoff:
    def __init__(self, base_delay: float = 1, max_retries: int = 5):
        self.base_delay = base_delay
        self.max_retries = max_retries
    
    def retry(self, func, *args, **kwargs):
        """Повторяет функцию с экспоненциальной задержкой"""
        last_exception = None
        
        for attempt in range(self.max_retries):
            try:
                return func(*args, **kwargs)
            except requests.RequestException as e:
                last_exception = e
                
                # Экспоненциальная задержка + jitter
                delay = self.base_delay * (2 ** attempt)
                jitter = random.uniform(0, 0.1 * delay)
                wait_time = delay + jitter
                
                print(f"Attempt {attempt + 1} failed. Retrying in {wait_time:.2f}s")
                time.sleep(wait_time)
        
        raise last_exception

# Использование
backoff = ExponentialBackoff(base_delay=1, max_retries=3)

def make_api_call():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()

data = backoff.retry(make_api_call)

7. Практический пример: Complete API client

import httpx
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset_at: datetime

class APIClient:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.rate_limit: Optional[RateLimitInfo] = None
    
    async def request(self, method: str, path: str, **kwargs) -> dict:
        """Make API request with rate limit awareness"""
        
        # Проверяем rate limit
        if self.rate_limit and self.rate_limit.remaining == 0:
            wait_time = (self.rate_limit.reset_at - datetime.now()).total_seconds()
            if wait_time > 0:
                print(f"Rate limit exceeded. Waiting {wait_time:.1f}s")
                await asyncio.sleep(wait_time)
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            **kwargs.get("headers", {})
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.request(
                method,
                f"{self.base_url}{path}",
                headers=headers,
                **kwargs
            )
            
            # Обновляем информацию о rate limit
            self.rate_limit = RateLimitInfo(
                limit=int(response.headers.get("X-RateLimit-Limit", 0)),
                remaining=int(response.headers.get("X-RateLimit-Remaining", 0)),
                reset_at=datetime.fromtimestamp(
                    int(response.headers.get("X-RateLimit-Reset", 0))
                )
            )
            
            response.raise_for_status()
            return response.json()
    
    async def get(self, path: str, **kwargs):
        return await self.request("GET", path, **kwargs)
    
    async def post(self, path: str, **kwargs):
        return await self.request("POST", path, **kwargs)

Ключевые концепции

  1. Rate Limiting: ограничение количества запросов
  2. Throttling: замедление обработки
  3. Quotas: лимиты на объём данных/операции
  4. Backoff: экспоненциальная задержка при ошибке
  5. API Keys: идентификация клиентов
  6. Response Headers: информирование клиента о лимитах