← Назад к вопросам

Какие действия предпримешь для поиска причины задержки работы эндпоинта до обращения к базе данных?

2.0 Middle🔥 191 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Какие действия предпримешь для поиска причины задержки работы эндпоинта до обращения к базе данных?

Когда эндпоинт медленно отвечает, проблема может быть ещё до обращения к БД. Вот мой систематический подход к диагностике, который экономит часы.

1. Инструментирование кода (Profiling)

Первое — добавить временные метки (timing points) в критических местах:

import time
from fastapi import FastAPI

app = FastAPI()

@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
    start = time.perf_counter()
    
    # Точка 1: начало обработки
    t1 = time.perf_counter()
    print(f"Request started: +{(t1 - start)*1000:.2f}ms")
    
    # Точка 2: после парсинга параметров и валидации
    t2 = time.perf_counter()
    print(f"Validation done: +{(t2 - start)*1000:.2f}ms")
    
    # Точка 3: после получения данных из cache
    cached_user = check_cache(user_id)
    t3 = time.perf_counter()
    print(f"Cache check done: +{(t3 - start)*1000:.2f}ms")
    
    if cached_user:
        return cached_user
    
    # Точка 4: перед запросом к БД
    t4 = time.perf_counter()
    print(f"Before DB call: +{(t4 - start)*1000:.2f}ms")  # <- ЗДЕСЬ ОПРЕДЕЛЯЕМ ЗАДЕРЖКУ
    
    user = await db.get_user(user_id)
    
    t5 = time.perf_counter()
    print(f"Response ready: +{(t5 - start)*1000:.2f}ms")
    
    return user

Output:

Request started: +0.15ms
Validation done: +1.23ms
Cache check done: +450.45ms  <- БИНГО! задержка в cache check
Before DB call: +450.67ms
Response ready: +451.12ms

2. Использование декоратора для измерения

import time
from functools import wraps
import logging

logger = logging.getLogger(__name__)

def measure_time(func):
    @wraps(func)
    async def async_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = await func(*args, **kwargs)
            return result
        finally:
            elapsed = (time.perf_counter() - start) * 1000
            logger.info(f"{func.__name__} took {elapsed:.2f}ms")
    
    @wraps(func)
    def sync_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            return result
        finally:
            elapsed = (time.perf_counter() - start) * 1000
            logger.info(f"{func.__name__} took {elapsed:.2f}ms")
    
    return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

# Применяем к каждому шагу
@measure_time
async def check_cache(user_id: str):
    await asyncio.sleep(0.45)  # Имитация долгой операции
    return None

@measure_time
async def validate_input(user_id: str):
    if not user_id:
        raise ValueError("Invalid user_id")

@measure_time
async def get_user(user_id: str):
    check_cache(user_id)
    validate_input(user_id)
    # ...

3. Проверка конфигурации и сетевых задержек

import socket
import time
import aiohttp

# Проверка подключения к Redis/Cache
def check_redis_latency():
    import redis
    
    r = redis.Redis(host='localhost', port=6379)
    
    start = time.perf_counter()
    r.ping()
    latency = (time.perf_counter() - start) * 1000
    
    print(f"Redis ping latency: {latency:.2f}ms")
    
    # Если >100ms — проблема в сетевых настройках или Redis перегружен

# Проверка размера данных в ответе
async def check_response_size():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as resp:
            data = await resp.read()
            print(f"Response size: {len(data) / 1024:.2f}KB")  # Большой размер → долгая сериализация

4. Анализ middleware и зависимостей

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import time

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Точка 1: начало обработки middleware
        t1 = time.perf_counter()
        
        # Точка 2: обработка других middleware
        response = await call_next(request)
        
        # Точка 3: конец middleware
        t2 = time.perf_counter()
        
        elapsed = (t2 - t1) * 1000
        response.headers["X-Process-Time"] = str(elapsed)
        
        return response

app.add_middleware(TimingMiddleware)

# Логирование для каждого middleware
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
    t1 = time.perf_counter()
    
    # Проверка авторизации (может быть медленной!)
    token = request.headers.get("Authorization", "")
    if not verify_token(token):  # <- МОЖЕТ БЫТЬ ЗДЕСЬ!
        return JSONResponse({"error": "Unauthorized"}, status_code=401)
    
    response = await call_next(request)
    
    elapsed = (time.perf_counter() - t1) * 1000
    logger.debug(f"Auth middleware: {elapsed:.2f}ms")
    
    return response

Типичные причины задержки в middleware:

  • Проверка токена JWT (криптографические операции)
  • CORS headers обработка
  • Logging слишком verbose
  • Rate limiting проверки

5. Анализ зависимостей (FastAPI Depends)

from fastapi import Depends

async def get_current_user(token: str = Header(...)):
    """Зависимость — может быть медленной!"""
    user = await verify_token_and_get_user(token)
    return user

@app.get("/api/v1/protected")
async def protected_route(user = Depends(get_current_user)):
    """Зависимость выполняется ДО обработчика"""
    return {"message": f"Hello {user.name}"}

# Если зависимость вызывается для КАЖДОГО запроса (даже параллельно),
# это может сорвать всё!
# Решение: кэширование или оптимизация

6. Использование cProfile для глубокого анализа

import cProfile
import pstats
import io

def profile_endpoint():
    pr = cProfile.Profile()
    pr.enable()
    
    # Ваш код здесь
    result = some_expensive_operation()
    
    pr.disable()
    
    # Вывод результатов
    s = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats(20)  # Top 20 функций по времени
    print(s.getvalue())

# Output:
#    ncalls  tottime  cumtime   filename:lineno(function)
#       1    0.001    0.450    app.py:120(check_cache)  <- ЗДЕСЬ!
#      10    0.100    0.200    redis.py:45(ping)

7. Реальный пример: поиск узкого места

import time
import asyncio
from fastapi import FastAPI

app = FastAPI()

@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
    total_start = time.perf_counter()
    
    # Step 1: Валидация (должна быть быстрой)
    start = time.perf_counter()
    if not user_id.isdigit():
        raise ValueError("Invalid user ID")
    print(f"  Validation: {(time.perf_counter() - start)*1000:.2f}ms")
    
    # Step 2: Проверка cache Redis
    start = time.perf_counter()
    cached = await redis_client.get(f"user:{user_id}")
    print(f"  Cache check: {(time.perf_counter() - start)*1000:.2f}ms")
    if cached:
        return json.loads(cached)
    
    # Step 3: Проверка разрешений (может быть долгой!)
    start = time.perf_counter()
    await check_user_permissions(user_id)
    print(f"  Permissions check: {(time.perf_counter() - start)*1000:.2f}ms")
    
    # Step 4: Получение данных
    start = time.perf_counter()
    print(f"  About to call DB")
    
    total = (time.perf_counter() - total_start) * 1000
    print(f"Total before DB: {total:.2f}ms")
    
    # Возвращаем всё, что уже посчитали
    return {
        "user_id": user_id,
        "timing_before_db_ms": total
    }

# Output при задержке 500ms до БД:
#   Validation: 0.05ms
#   Cache check: 450.22ms  <- ПРОБЛЕМА!
#   Permissions check: 0.12ms
#   About to call DB
# Total before DB: 450.39ms

Чеклист диагностики

  1. Добавить timing points перед/после каждого шага
  2. Проверить middleware — они могут быть медленными
  3. Проверить зависимости (Depends в FastAPI)
  4. Измерить латентность сервисов (Redis, Cache, внешние API)
  5. Запустить cProfile для точной диагностики
  6. Проверить размер ответа — большие данные долго сериализуются
  7. Проверить конкурентность — может ли быть лимит подключений?