← Назад к вопросам
Если endpoint работает 30 секунд, какие действия предпримешь
2.0 Middle🔥 191 комментариев
#REST API и HTTP#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Оптимизация endpoint работающего 30 секунд
Это типичная проблема в production. 30 секунд — это огромный timeout. Я разбираю проблему систематически.
Шаг 1: Диагностика (профилирование)
Первое, что нужно сделать — найти bottleneck. Использую несколько инструментов:
Профилирование с помощью cProfile:
import cProfile
import pstats
from io import StringIO
from fastapi import FastAPI
from functools import wraps
app = FastAPI()
def profile_endpoint(func):
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10)
print(s.getvalue())
return result
return wrapper
@app.get("/slow-endpoint")
@profile_endpoint
def slow_endpoint():
return {"status": "ok"}
Логирование с временем:
import time
import logging
logger = logging.getLogger(__name__)
def timed_function(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
logger.info(f"{func.__name__} took {elapsed:.2f}s")
return result
return wrapper
@timed_function
def fetch_user_data(user_id):
# что-то медленное
pass
Шаг 2: Типичные проблемы
1. N+1 запросы в БД
Это самая частая причина.例:
# ❌ МЕДЛЕННО — N+1 проблема
users = User.query.all()
for user in users:
posts = user.posts # Запрос в БД за каждого юзера!
process(posts)
# Если 100 юзеров — 101 запрос!
Исправления:
# ✅ Eager loading (SQLAlchemy)
from sqlalchemy.orm import joinedload
users = User.query.options(joinedload(User.posts)).all()
for user in users:
posts = user.posts # Уже загружено, нет запроса
2. Отсутствие индексов в БД
# Медленный запрос без индекса
SELECT * FROM users WHERE email = 'test@example.com' -- 5 сек
-- Добавляю индекс
CREATE INDEX idx_users_email ON users(email);
-- Теперь 0.1 сек
3. Загрузка всех данных вместо пагинации
# ❌ Загружаю 1M записей
@app.get("/items")
def get_all_items():
return db.query(Item).all() # Можно потратить 20 сек только на SQL
# ✅ Пагинирую
@app.get("/items")
def get_items(skip: int = 0, limit: int = 50):
return db.query(Item).offset(skip).limit(limit).all()
4. Синхронные внешние API запросы
# ❌ Блокирующие запросы
import requests
@app.get("/user/{user_id}/enriched")
def get_enriched_user(user_id):
user = db.query(User).get(user_id) # 1 сек
profile = requests.get(f"https://api.external.com/{user_id}").json() # 10 сек
weather = requests.get(f"https://weather.api.com").json() # 5 сек
return {"user": user, "profile": profile, "weather": weather} # Итого 16 сек
# ✅ Асинхронные запросы
import aiohttp
from fastapi import FastAPI
app = FastAPI()
@app.get("/user/{user_id}/enriched")
async def get_enriched_user(user_id):
async with aiohttp.ClientSession() as session:
# Все запросы параллельно
user_task = asyncio.create_task(fetch_user(user_id))
profile_task = asyncio.create_task(
session.get(f"https://api.external.com/{user_id}")
)
weather_task = asyncio.create_task(
session.get("https://weather.api.com")
)
user, profile, weather = await asyncio.gather(
user_task, profile_task, weather_task
) # Все параллельно: 10 сек, а не 16
5. Неоптимальные SQL запросы
# ❌ Медленный запрос
SELECT * FROM orders
WHERE YEAR(created_at) = 2024 # Full table scan!
# ✅ Оптимальный
SELECT * FROM orders
WHERE created_at >= '2024-01-01' AND created_at < '2025-01-01'
AND status IN ('completed', 'shipped') -- Больше фильтров, меньше данных
LIMIT 100
Шаг 3: Кэширование
Если операция медленная, но её результат стабилен — кэшируй:
from functools import lru_cache
import redis
# Простое кэширование в памяти
@lru_cache(maxsize=128)
def get_user_recommendations(user_id: int):
# Медленная операция
return compute_recommendations(user_id)
# Redis кэширование
import pickle
redis_client = redis.Redis()
def get_user_profile(user_id):
cache_key = f"profile:{user_id}"
# Проверяю кэш
cached = redis_client.get(cache_key)
if cached:
return pickle.loads(cached)
# Если нет в кэше — загружаю и кэширую на час
profile = db.query(User).get(user_id)
redis_client.setex(
cache_key,
3600, # 1 час TTL
pickle.dumps(profile)
)
return profile
Шаг 4: Фоновые задачи
Если операция действительно медленная, но пользователю результат не нужен сразу:
from celery import Celery
celery = Celery('myapp')
# Медленная операция в фоне
@celery.task
def process_large_report(user_id):
# 30 секунд обработки
report = generate_report(user_id)
save_report(user_id, report)
# Endpoint возвращает быстро
@app.post("/reports")
def create_report(user_id: int):
task = process_large_report.delay(user_id)
return {
"task_id": task.id,
"status": "processing"
}
# Пользователь может проверить статус
@app.get("/reports/{task_id}")
def get_report_status(task_id):
task = celery.AsyncResult(task_id)
if task.ready():
return {"status": "done", "result": task.result}
return {"status": "processing"}
Шаг 5: Масштабирование
Если оптимизаций недостаточно:
# Кэширование на уровне HTTP (CDN, Varnish)
@app.get("/data", response_model=DataResponse)
def get_data(cache_control: bool = True):
if cache_control:
return Response(
content=data,
headers={"Cache-Control": "public, max-age=3600"}
)
return data
# Rate limiting на уровне инфраструктуры (nginx, API gateway)
# Это предотвращает overload от тяжёлых операций
Мой процесс в реальности
- Профилирую — где время тратится?
- Ищу N+1 — самая частая причина
- Добавляю индексы — БД ускоряется в 10x
- Кэширую — если данные стабильны
- Перевожу на асинхронность — если есть I/O операции
- Переношу в background task — если нет срочности
Пример: Реальная оптимизация (с 30 сек до 0.2 сек)
# ДО
@app.get("/user/{user_id}/stats")
def get_user_stats(user_id):
user = db.query(User).get(user_id) # 1 сек
# N+1 проблема в цикле
posts = db.query(Post).filter(Post.user_id == user_id).all() # 10 сек
for post in posts:
comments = db.query(Comment).filter(Comment.post_id == post.id).all() # 10 сек за каждый!
return {"user": user, "posts": posts} # Итого 20+ сек
# ПОСЛЕ
@app.get("/user/{user_id}/stats")
def get_user_stats(user_id):
user = db.query(User).options(
joinedload(User.posts).joinedload(Post.comments)
).filter(User.id == user_id).first() # 0.1 сек (одним запросом!)
return user # 0.1 сек вместо 30 сек!
Главное — не гадать, а мерить. Профилирование спасает.