← Назад к вопросам

Если endpoint работает 30 секунд, какие действия предпримешь

2.0 Middle🔥 191 комментариев
#REST API и HTTP#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Оптимизация endpoint работающего 30 секунд

Это типичная проблема в production. 30 секунд — это огромный timeout. Я разбираю проблему систематически.

Шаг 1: Диагностика (профилирование)

Первое, что нужно сделать — найти bottleneck. Использую несколько инструментов:

Профилирование с помощью cProfile:

import cProfile
import pstats
from io import StringIO
from fastapi import FastAPI
from functools import wraps

app = FastAPI()

def profile_endpoint(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        profiler.enable()
        result = func(*args, **kwargs)
        profiler.disable()
        
        s = StringIO()
        ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
        ps.print_stats(10)
        print(s.getvalue())
        return result
    return wrapper

@app.get("/slow-endpoint")
@profile_endpoint
def slow_endpoint():
    return {"status": "ok"}

Логирование с временем:

import time
import logging

logger = logging.getLogger(__name__)

def timed_function(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start
        logger.info(f"{func.__name__} took {elapsed:.2f}s")
        return result
    return wrapper

@timed_function
def fetch_user_data(user_id):
    # что-то медленное
    pass

Шаг 2: Типичные проблемы

1. N+1 запросы в БД

Это самая частая причина.例:

# ❌ МЕДЛЕННО — N+1 проблема
users = User.query.all()
for user in users:
    posts = user.posts  # Запрос в БД за каждого юзера!
    process(posts)

# Если 100 юзеров — 101 запрос!

Исправления:

# ✅ Eager loading (SQLAlchemy)
from sqlalchemy.orm import joinedload

users = User.query.options(joinedload(User.posts)).all()
for user in users:
    posts = user.posts  # Уже загружено, нет запроса

2. Отсутствие индексов в БД

# Медленный запрос без индекса
SELECT * FROM users WHERE email = 'test@example.com'  -- 5 сек

-- Добавляю индекс
CREATE INDEX idx_users_email ON users(email);
-- Теперь 0.1 сек

3. Загрузка всех данных вместо пагинации

# ❌ Загружаю 1M записей
@app.get("/items")
def get_all_items():
    return db.query(Item).all()  # Можно потратить 20 сек только на SQL

# ✅ Пагинирую
@app.get("/items")
def get_items(skip: int = 0, limit: int = 50):
    return db.query(Item).offset(skip).limit(limit).all()

4. Синхронные внешние API запросы

# ❌ Блокирующие запросы
import requests

@app.get("/user/{user_id}/enriched")
def get_enriched_user(user_id):
    user = db.query(User).get(user_id)  # 1 сек
    profile = requests.get(f"https://api.external.com/{user_id}").json()  # 10 сек
    weather = requests.get(f"https://weather.api.com").json()  # 5 сек
    return {"user": user, "profile": profile, "weather": weather}  # Итого 16 сек

# ✅ Асинхронные запросы
import aiohttp
from fastapi import FastAPI

app = FastAPI()

@app.get("/user/{user_id}/enriched")
async def get_enriched_user(user_id):
    async with aiohttp.ClientSession() as session:
        # Все запросы параллельно
        user_task = asyncio.create_task(fetch_user(user_id))
        profile_task = asyncio.create_task(
            session.get(f"https://api.external.com/{user_id}")
        )
        weather_task = asyncio.create_task(
            session.get("https://weather.api.com")
        )
        
        user, profile, weather = await asyncio.gather(
            user_task, profile_task, weather_task
        )  # Все параллельно: 10 сек, а не 16

5. Неоптимальные SQL запросы

# ❌ Медленный запрос
SELECT * FROM orders 
WHERE YEAR(created_at) = 2024  # Full table scan!

# ✅ Оптимальный
SELECT * FROM orders
WHERE created_at >= '2024-01-01' AND created_at < '2025-01-01'
AND status IN ('completed', 'shipped')  -- Больше фильтров, меньше данных
LIMIT 100

Шаг 3: Кэширование

Если операция медленная, но её результат стабилен — кэшируй:

from functools import lru_cache
import redis

# Простое кэширование в памяти
@lru_cache(maxsize=128)
def get_user_recommendations(user_id: int):
    # Медленная операция
    return compute_recommendations(user_id)

# Redis кэширование
import pickle

redis_client = redis.Redis()

def get_user_profile(user_id):
    cache_key = f"profile:{user_id}"
    
    # Проверяю кэш
    cached = redis_client.get(cache_key)
    if cached:
        return pickle.loads(cached)
    
    # Если нет в кэше — загружаю и кэширую на час
    profile = db.query(User).get(user_id)
    redis_client.setex(
        cache_key,
        3600,  # 1 час TTL
        pickle.dumps(profile)
    )
    return profile

Шаг 4: Фоновые задачи

Если операция действительно медленная, но пользователю результат не нужен сразу:

from celery import Celery

celery = Celery('myapp')

# Медленная операция в фоне
@celery.task
def process_large_report(user_id):
    # 30 секунд обработки
    report = generate_report(user_id)
    save_report(user_id, report)

# Endpoint возвращает быстро
@app.post("/reports")
def create_report(user_id: int):
    task = process_large_report.delay(user_id)
    return {
        "task_id": task.id,
        "status": "processing"
    }

# Пользователь может проверить статус
@app.get("/reports/{task_id}")
def get_report_status(task_id):
    task = celery.AsyncResult(task_id)
    if task.ready():
        return {"status": "done", "result": task.result}
    return {"status": "processing"}

Шаг 5: Масштабирование

Если оптимизаций недостаточно:

# Кэширование на уровне HTTP (CDN, Varnish)
@app.get("/data", response_model=DataResponse)
def get_data(cache_control: bool = True):
    if cache_control:
        return Response(
            content=data,
            headers={"Cache-Control": "public, max-age=3600"}
        )
    return data

# Rate limiting на уровне инфраструктуры (nginx, API gateway)
# Это предотвращает overload от тяжёлых операций

Мой процесс в реальности

  1. Профилирую — где время тратится?
  2. Ищу N+1 — самая частая причина
  3. Добавляю индексы — БД ускоряется в 10x
  4. Кэширую — если данные стабильны
  5. Перевожу на асинхронность — если есть I/O операции
  6. Переношу в background task — если нет срочности

Пример: Реальная оптимизация (с 30 сек до 0.2 сек)

# ДО
@app.get("/user/{user_id}/stats")
def get_user_stats(user_id):
    user = db.query(User).get(user_id)  # 1 сек
    # N+1 проблема в цикле
    posts = db.query(Post).filter(Post.user_id == user_id).all()  # 10 сек
    for post in posts:
        comments = db.query(Comment).filter(Comment.post_id == post.id).all()  # 10 сек за каждый!
    return {"user": user, "posts": posts}  # Итого 20+ сек

# ПОСЛЕ
@app.get("/user/{user_id}/stats")
def get_user_stats(user_id):
    user = db.query(User).options(
        joinedload(User.posts).joinedload(Post.comments)
    ).filter(User.id == user_id).first()  # 0.1 сек (одним запросом!)
    return user  # 0.1 сек вместо 30 сек!

Главное — не гадать, а мерить. Профилирование спасает.

Если endpoint работает 30 секунд, какие действия предпримешь | PrepBro