← Назад к вопросам
Какие действия предпримешь для поиска причины задержки работы эндпоинта до обращения к базе данных?
2.0 Middle🔥 191 комментариев
#DevOps и инфраструктура#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Какие действия предпримешь для поиска причины задержки работы эндпоинта до обращения к базе данных?
Когда эндпоинт медленно отвечает, проблема может быть ещё до обращения к БД. Вот мой систематический подход к диагностике, который экономит часы.
1. Инструментирование кода (Profiling)
Первое — добавить временные метки (timing points) в критических местах:
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
start = time.perf_counter()
# Точка 1: начало обработки
t1 = time.perf_counter()
print(f"Request started: +{(t1 - start)*1000:.2f}ms")
# Точка 2: после парсинга параметров и валидации
t2 = time.perf_counter()
print(f"Validation done: +{(t2 - start)*1000:.2f}ms")
# Точка 3: после получения данных из cache
cached_user = check_cache(user_id)
t3 = time.perf_counter()
print(f"Cache check done: +{(t3 - start)*1000:.2f}ms")
if cached_user:
return cached_user
# Точка 4: перед запросом к БД
t4 = time.perf_counter()
print(f"Before DB call: +{(t4 - start)*1000:.2f}ms") # <- ЗДЕСЬ ОПРЕДЕЛЯЕМ ЗАДЕРЖКУ
user = await db.get_user(user_id)
t5 = time.perf_counter()
print(f"Response ready: +{(t5 - start)*1000:.2f}ms")
return user
Output:
Request started: +0.15ms
Validation done: +1.23ms
Cache check done: +450.45ms <- БИНГО! задержка в cache check
Before DB call: +450.67ms
Response ready: +451.12ms
2. Использование декоратора для измерения
import time
from functools import wraps
import logging
logger = logging.getLogger(__name__)
def measure_time(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
elapsed = (time.perf_counter() - start) * 1000
logger.info(f"{func.__name__} took {elapsed:.2f}ms")
@wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
elapsed = (time.perf_counter() - start) * 1000
logger.info(f"{func.__name__} took {elapsed:.2f}ms")
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
# Применяем к каждому шагу
@measure_time
async def check_cache(user_id: str):
await asyncio.sleep(0.45) # Имитация долгой операции
return None
@measure_time
async def validate_input(user_id: str):
if not user_id:
raise ValueError("Invalid user_id")
@measure_time
async def get_user(user_id: str):
check_cache(user_id)
validate_input(user_id)
# ...
3. Проверка конфигурации и сетевых задержек
import socket
import time
import aiohttp
# Проверка подключения к Redis/Cache
def check_redis_latency():
import redis
r = redis.Redis(host='localhost', port=6379)
start = time.perf_counter()
r.ping()
latency = (time.perf_counter() - start) * 1000
print(f"Redis ping latency: {latency:.2f}ms")
# Если >100ms — проблема в сетевых настройках или Redis перегружен
# Проверка размера данных в ответе
async def check_response_size():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as resp:
data = await resp.read()
print(f"Response size: {len(data) / 1024:.2f}KB") # Большой размер → долгая сериализация
4. Анализ middleware и зависимостей
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import time
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Точка 1: начало обработки middleware
t1 = time.perf_counter()
# Точка 2: обработка других middleware
response = await call_next(request)
# Точка 3: конец middleware
t2 = time.perf_counter()
elapsed = (t2 - t1) * 1000
response.headers["X-Process-Time"] = str(elapsed)
return response
app.add_middleware(TimingMiddleware)
# Логирование для каждого middleware
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
t1 = time.perf_counter()
# Проверка авторизации (может быть медленной!)
token = request.headers.get("Authorization", "")
if not verify_token(token): # <- МОЖЕТ БЫТЬ ЗДЕСЬ!
return JSONResponse({"error": "Unauthorized"}, status_code=401)
response = await call_next(request)
elapsed = (time.perf_counter() - t1) * 1000
logger.debug(f"Auth middleware: {elapsed:.2f}ms")
return response
Типичные причины задержки в middleware:
- Проверка токена JWT (криптографические операции)
- CORS headers обработка
- Logging слишком verbose
- Rate limiting проверки
5. Анализ зависимостей (FastAPI Depends)
from fastapi import Depends
async def get_current_user(token: str = Header(...)):
"""Зависимость — может быть медленной!"""
user = await verify_token_and_get_user(token)
return user
@app.get("/api/v1/protected")
async def protected_route(user = Depends(get_current_user)):
"""Зависимость выполняется ДО обработчика"""
return {"message": f"Hello {user.name}"}
# Если зависимость вызывается для КАЖДОГО запроса (даже параллельно),
# это может сорвать всё!
# Решение: кэширование или оптимизация
6. Использование cProfile для глубокого анализа
import cProfile
import pstats
import io
def profile_endpoint():
pr = cProfile.Profile()
pr.enable()
# Ваш код здесь
result = some_expensive_operation()
pr.disable()
# Вывод результатов
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20) # Top 20 функций по времени
print(s.getvalue())
# Output:
# ncalls tottime cumtime filename:lineno(function)
# 1 0.001 0.450 app.py:120(check_cache) <- ЗДЕСЬ!
# 10 0.100 0.200 redis.py:45(ping)
7. Реальный пример: поиск узкого места
import time
import asyncio
from fastapi import FastAPI
app = FastAPI()
@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
total_start = time.perf_counter()
# Step 1: Валидация (должна быть быстрой)
start = time.perf_counter()
if not user_id.isdigit():
raise ValueError("Invalid user ID")
print(f" Validation: {(time.perf_counter() - start)*1000:.2f}ms")
# Step 2: Проверка cache Redis
start = time.perf_counter()
cached = await redis_client.get(f"user:{user_id}")
print(f" Cache check: {(time.perf_counter() - start)*1000:.2f}ms")
if cached:
return json.loads(cached)
# Step 3: Проверка разрешений (может быть долгой!)
start = time.perf_counter()
await check_user_permissions(user_id)
print(f" Permissions check: {(time.perf_counter() - start)*1000:.2f}ms")
# Step 4: Получение данных
start = time.perf_counter()
print(f" About to call DB")
total = (time.perf_counter() - total_start) * 1000
print(f"Total before DB: {total:.2f}ms")
# Возвращаем всё, что уже посчитали
return {
"user_id": user_id,
"timing_before_db_ms": total
}
# Output при задержке 500ms до БД:
# Validation: 0.05ms
# Cache check: 450.22ms <- ПРОБЛЕМА!
# Permissions check: 0.12ms
# About to call DB
# Total before DB: 450.39ms
Чеклист диагностики
- ✅ Добавить timing points перед/после каждого шага
- ✅ Проверить middleware — они могут быть медленными
- ✅ Проверить зависимости (Depends в FastAPI)
- ✅ Измерить латентность сервисов (Redis, Cache, внешние API)
- ✅ Запустить cProfile для точной диагностики
- ✅ Проверить размер ответа — большие данные долго сериализуются
- ✅ Проверить конкурентность — может ли быть лимит подключений?