← Назад к вопросам

Какие знаешь варианты оптимизации API?

2.0 Middle🔥 221 комментариев
#REST API и HTTP#Soft Skills#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Оптимизация API: полный обзор

Оптимизация API — это критически важный аспект построения масштабируемых систем. Существует множество проверенных подходов, которые я применяю в production-коде.

1. Оптимизация запросов к БД

N+1 Query Problem

Это один из самых распространённых потенциалов для оптимизации:

# ❌ Плохо — выполнит 1 + N запросов
from sqlalchemy import Session

users = session.query(User).all()
for user in users:
    print(user.posts)  # Каждый user.posts вызывает отдельный запрос

# ✅ Хорошо — одна оптимизированная стратегия загрузки
from sqlalchemy.orm import joinedload

users = session.query(User).options(
    joinedload(User.posts)
).all()

Индексы в БД

# В миграциях
op.create_index('ix_users_email', 'users', ['email'], unique=True)

# Или в SQLAlchemy модели
from sqlalchemy import Index

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    email = Column(String, index=True)
    created_at = Column(DateTime, index=True)

Select только нужные поля

# ❌ Плохо — выбирает все поля
users = session.query(User).all()

# ✅ Хорошо — выбирает только необходимое
from sqlalchemy import select

stmt = select(User.id, User.name)
users = session.execute(stmt).fetchall()

2. Кеширование

HTTP Caching Headers

from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get('/api/products/{product_id}')
async def get_product(product_id: int):
    product = await fetch_product(product_id)
    return JSONResponse(
        content=product,
        headers={'Cache-Control': 'public, max-age=3600'}
    )

Redis для сессий и данных

import redis
import json
from functools import wraps

redis_client = redis.Redis(host='localhost', port=6379)

def cached(ttl_seconds=3600):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            cache_key = f'{func.__name__}:{args}:{kwargs}'
            cached_value = redis_client.get(cache_key)
            if cached_value:
                return json.loads(cached_value)
            
            result = await func(*args, **kwargs)
            redis_client.setex(cache_key, ttl_seconds, json.dumps(result))
            return result
        return wrapper
    return decorator

3. Pagination и Limiting

from fastapi import FastAPI, Query
from sqlalchemy import select, func

@app.get('/api/users')
async def list_users(
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
):
    stmt = select(User).offset(skip).limit(limit)
    users = session.execute(stmt).scalars().all()
    total = session.query(func.count(User.id)).scalar()
    
    return {'items': users, 'total': total}

4. Асинхронность

import asyncio
from httpx import AsyncClient

# ❌ Плохо — синхронный, блокирующий
def fetch_data():
    response = requests.get('https://api.example.com/data')
    return response.json()

# ✅ Хорошо — асинхронный, неблокирующий
async def fetch_data():
    async with AsyncClient() as client:
        response = await client.get('https://api.example.com/data')
        return response.json()

async def fetch_multiple():
    tasks = [fetch_user(1), fetch_user(2), fetch_user(3)]
    return await asyncio.gather(*tasks)

5. Сжатие и форматы ответов

from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

6. Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'postgresql://user:password@localhost/db',
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=40,
    pool_pre_ping=True,
    pool_recycle=3600,
)

7. Rate Limiting

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.get('/api/search')
@limiter.limit('10/minute')
async def search(q: str):
    return await perform_search(q)

8. Батчинг запросов

from dataclasses import dataclass
from typing import List

@dataclass
class BatchProcessor:
    batch_size: int = 100
    
    async def process_batch(self, items: List):
        for i in range(0, len(items), self.batch_size):
            batch = items[i:i+self.batch_size]
            await self._process_chunk(batch)

9. Мониторинг и профилирование

import time
from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)

@contextmanager
def measure_time(operation_name: str):
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        if elapsed > 1.0:
            logger.warning(f'{operation_name} took {elapsed:.2f}s')

10. Lazy Loading и Streaming

from fastapi.responses import StreamingResponse
import csv
from io import StringIO

async def generate_csv():
    output = StringIO()
    writer = csv.DictWriter(output, fieldnames=['id', 'name'])
    writer.writeheader()
    
    async for row in get_large_dataset():
        writer.writerow(row)
        yield output.getvalue()

@app.get('/api/export/users')
async def export_users():
    return StreamingResponse(
        generate_csv(),
        media_type='text/csv',
        headers={'Content-Disposition': 'attachment; filename=users.csv'}
    )

Итоговый чеклист

✅ Индексы на часто запрашиваемых полях ✅ Избегаю N+1 queries через joinedload ✅ Кеширование в Redis ✅ Pagination для больших наборов ✅ Асинхронный код для I/O операций ✅ Connection pooling ✅ HTTP кеширование ✅ Сжатие ответов (GZIP) ✅ Rate limiting ✅ Мониторинг узких мест