← Назад к вопросам

Как диагностировать задержку получения ответа от БД?

1.8 Middle🔥 211 комментариев
#DevOps и инфраструктура#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как диагностировать задержку получения ответа от БД?

Задержки при получении ответов от БД — это одна из самых частых проблем в production. Рассмотрим систематический подход к диагностике.

1. Логирование времени выполнения запросов

Основной инструмент — измерение времени запроса:

import time
import logging
from contextlib import contextmanager

logger = logging.getLogger(__name__)

@contextmanager
def measure_query_time(query_name: str):
    """Контекстный менеджер для измерения времени БД запроса"""
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"Query {query_name} took {elapsed:.3f}s")
        if elapsed > 1.0:  # Порог в 1 сек
            logger.warning(f"SLOW QUERY: {query_name} took {elapsed:.3f}s")

# Использование
with measure_query_time("get_user_profile"):
    user = db.query(User).filter_by(id=user_id).first()

2. Профилирование SQL запросов (SQLAlchemy)

from sqlalchemy import event, create_engine
import time
import logging

logger = logging.getLogger("sqlalchemy.engine")
logger.setLevel(logging.INFO)

engine = create_engine("postgresql://user:pass@localhost/db", echo=False)

# Перехватываем все SQL запросы
@event.listens_for(engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    conn.info.setdefault("query_start_time", []).append(time.time())
    logger.info(f"START: {statement}")

@event.listens_for(engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany):
    total = time.time() - conn.info["query_start_time"].pop(-1)
    logger.info(f"FINISH in {total:.3f}s: {statement}")
    if total > 1.0:
        logger.warning(f"SLOW QUERY ({total:.3f}s): {statement}")

3. Анализ плана выполнения запроса (EXPLAIN ANALYZE)

from sqlalchemy import text

# Получи детальный план выполнения
query = text("""
EXPLAIN ANALYZE
SELECT u.id, u.name, COUNT(p.id) as posts_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id, u.name
WHERE u.created_at > NOW() - INTERVAL 30 DAY
""")

with engine.connect() as conn:
    result = conn.execute(query)
    for row in result:
        print(row[0])

4. Проверка индексов

# Найди запросы без индексов (Seq Scan)
query = text("""
SELECT schemaname, tablename, indexname
FROM pg_indexes
WHERE schemaname NOT IN (pg_catalog, information_schema)
ORDER BY tablename;
""")

# Найди медленные операции с полным сканированием
query = text("""
EXPLAIN ANALYZE
SELECT * FROM orders WHERE status = completed;
""")

# Если видишь "Seq Scan", нужен индекс:
# CREATE INDEX idx_orders_status ON orders(status);

5. Инструмент pgAdmin/DBeaver

Используй графические инструменты для анализа:

# В DBeaver:
# 1. Выбери query
# 2. Ctrl+Shift+E (Execute Explain Plan)
# 3. Смотри на время execution и Seq Scan

# Или SQL:
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM large_table WHERE id = 12345;

6. Мониторинг в production (pg_stat_statements)

# Включи расширение в PostgreSQL
# CREATE EXTENSION pg_stat_statements;

query = text("""
SELECT query, calls, mean_exec_time, max_exec_time
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;
""")

# Получи 10 самых медленных запросов
with engine.connect() as conn:
    result = conn.execute(query)
    for row in result:
        print(f"Query: {row.query[:50]}...")
        print(f"  Calls: {row.calls}, Mean: {row.mean_exec_time:.2f}ms")

7. Проверка N+1 проблемы

# ❌ ПЛОХО: N+1 запросов
users = db.query(User).all()
for user in users:
    posts = db.query(Post).filter_by(user_id=user.id).all()  # Отдельный запрос!

# ✅ ХОРОШО: eager loading
users = db.query(User).options(
    joinedload(User.posts)
).all()

# ✅ ИЛИ явный JOIN
from sqlalchemy.orm import joinedload
users = db.query(User).options(joinedload(User.posts)).all()

8. Проверка размера данных

# Размер таблицы
query = text("""
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||.||tablename)) AS size
FROM pg_tables
WHERE schemaname NOT IN (pg_catalog, information_schema)
ORDER BY pg_total_relation_size(schemaname||.||tablename) DESC;
""")

# Найди неиспользуемые индексы
query = text("""
SELECT 
    schemaname, tablename, indexname,
    idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;
""")

9. Стресс-тестирование (Apache JMeter, wrk)

# Используй wrk для нагрузочного тестирования
wrk -t12 -c400 -d30s http://localhost:8000/api/users

# Или locust на Python
from locust import HttpUser, task

class UserBehavior(HttpUser):
    @task
    def get_users(self):
        self.client.get("/api/users")

10. Чек-лист диагностики

  • Логируй время всех запросов — установи порог (например, 1 сек)
  • Используй EXPLAIN ANALYZE — найди узкие места
  • Проверь индексы — Seq Scan обычно проблема
  • Ищи N+1 проблемы — используй eager loading
  • Профилируй в production — pg_stat_statements
  • Мониторь размер данных — растёт ли таблица неконтролируемо
  • Проверь подключение — сетевая задержка
  • Проверь настройки БД — work_mem, shared_buffers
  • Запусти нагрузочный тест — найди точку отказа

Большинство задержек — это либо отсутствие индексов, либо N+1 запросы, либо полное сканирование больших таблиц.