← Назад к вопросам
Какие знаешь варианты оптимизации API?
2.0 Middle🔥 221 комментариев
#REST API и HTTP#Soft Skills#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Оптимизация API: полный обзор
Оптимизация API — это критически важный аспект построения масштабируемых систем. Существует множество проверенных подходов, которые я применяю в production-коде.
1. Оптимизация запросов к БД
N+1 Query Problem
Это один из самых распространённых потенциалов для оптимизации:
# ❌ Плохо — выполнит 1 + N запросов
from sqlalchemy import Session
users = session.query(User).all()
for user in users:
print(user.posts) # Каждый user.posts вызывает отдельный запрос
# ✅ Хорошо — одна оптимизированная стратегия загрузки
from sqlalchemy.orm import joinedload
users = session.query(User).options(
joinedload(User.posts)
).all()
Индексы в БД
# В миграциях
op.create_index('ix_users_email', 'users', ['email'], unique=True)
# Или в SQLAlchemy модели
from sqlalchemy import Index
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String, index=True)
created_at = Column(DateTime, index=True)
Select только нужные поля
# ❌ Плохо — выбирает все поля
users = session.query(User).all()
# ✅ Хорошо — выбирает только необходимое
from sqlalchemy import select
stmt = select(User.id, User.name)
users = session.execute(stmt).fetchall()
2. Кеширование
HTTP Caching Headers
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get('/api/products/{product_id}')
async def get_product(product_id: int):
product = await fetch_product(product_id)
return JSONResponse(
content=product,
headers={'Cache-Control': 'public, max-age=3600'}
)
Redis для сессий и данных
import redis
import json
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379)
def cached(ttl_seconds=3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
cache_key = f'{func.__name__}:{args}:{kwargs}'
cached_value = redis_client.get(cache_key)
if cached_value:
return json.loads(cached_value)
result = await func(*args, **kwargs)
redis_client.setex(cache_key, ttl_seconds, json.dumps(result))
return result
return wrapper
return decorator
3. Pagination и Limiting
from fastapi import FastAPI, Query
from sqlalchemy import select, func
@app.get('/api/users')
async def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
):
stmt = select(User).offset(skip).limit(limit)
users = session.execute(stmt).scalars().all()
total = session.query(func.count(User.id)).scalar()
return {'items': users, 'total': total}
4. Асинхронность
import asyncio
from httpx import AsyncClient
# ❌ Плохо — синхронный, блокирующий
def fetch_data():
response = requests.get('https://api.example.com/data')
return response.json()
# ✅ Хорошо — асинхронный, неблокирующий
async def fetch_data():
async with AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
async def fetch_multiple():
tasks = [fetch_user(1), fetch_user(2), fetch_user(3)]
return await asyncio.gather(*tasks)
5. Сжатие и форматы ответов
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
6. Connection Pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:password@localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=40,
pool_pre_ping=True,
pool_recycle=3600,
)
7. Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.get('/api/search')
@limiter.limit('10/minute')
async def search(q: str):
return await perform_search(q)
8. Батчинг запросов
from dataclasses import dataclass
from typing import List
@dataclass
class BatchProcessor:
batch_size: int = 100
async def process_batch(self, items: List):
for i in range(0, len(items), self.batch_size):
batch = items[i:i+self.batch_size]
await self._process_chunk(batch)
9. Мониторинг и профилирование
import time
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
@contextmanager
def measure_time(operation_name: str):
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
if elapsed > 1.0:
logger.warning(f'{operation_name} took {elapsed:.2f}s')
10. Lazy Loading и Streaming
from fastapi.responses import StreamingResponse
import csv
from io import StringIO
async def generate_csv():
output = StringIO()
writer = csv.DictWriter(output, fieldnames=['id', 'name'])
writer.writeheader()
async for row in get_large_dataset():
writer.writerow(row)
yield output.getvalue()
@app.get('/api/export/users')
async def export_users():
return StreamingResponse(
generate_csv(),
media_type='text/csv',
headers={'Content-Disposition': 'attachment; filename=users.csv'}
)
Итоговый чеклист
✅ Индексы на часто запрашиваемых полях ✅ Избегаю N+1 queries через joinedload ✅ Кеширование в Redis ✅ Pagination для больших наборов ✅ Асинхронный код для I/O операций ✅ Connection pooling ✅ HTTP кеширование ✅ Сжатие ответов (GZIP) ✅ Rate limiting ✅ Мониторинг узких мест