← Назад к вопросам

Как можно экстренно реанимировать запрос?

2.0 Middle🔥 121 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Экстренная реанимация медленного запроса

Когда запрос начал тормозить в продакшене, нужна методичная диагностика и оптимизация. Это критически важный навык для Python-разработчика.

Шаг 1: Диагностика (EXPLAIN, EXPLAIN ANALYZE)

import sqlite3
from psycopg2 import connect

# PostgreSQL
conn = connect("dbname=mydb user=postgres")
cur = conn.cursor()

# EXPLAIN ANALYZE показывает план выполнения и реальное время
cur.execute("""
    EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
    SELECT u.id, u.name, COUNT(p.id) as post_count
    FROM users u
    LEFT JOIN posts p ON u.id = p.user_id
    WHERE u.created_at > '2024-01-01'
    GROUP BY u.id, u.name
    ORDER BY post_count DESC
    LIMIT 100
""")

plan = cur.fetchone()[0]
for row in plan:
    print(f"Seq Scan: {row.get('Node Type')}")
    print(f"Actual time: {row.get('Actual Total Time')} ms")

На что смотреть в EXPLAIN:

  • Seq Scan — сканирование всей таблицы (плохо)
  • Index Scan — использование индекса (хорошо)
  • Nested Loop — для каждой строки внешнего запроса ищет в B-дереве
  • Hash Join — хеширует меньшую таблицу, ищет в хеше

Шаг 2: Индексирование

# Миграция для добавления индекса
# UP
CREATE INDEX CONCURRENTLY idx_users_created_at 
    ON users(created_at DESC)
    WHERE created_at > '2024-01-01';

CREATE INDEX CONCURRENTLY idx_posts_user_id 
    ON posts(user_id);

# Составной индекс для сложных WHERE
CREATE INDEX CONCURRENTLY idx_users_status_created
    ON users(status, created_at DESC);

# Индекс на выражение
CREATE INDEX CONCURRENTLY idx_users_name_lower
    ON users(LOWER(name));
# SQLAlchemy модель с индексами
from sqlalchemy import Column, Integer, String, DateTime, Index, text
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String, unique=True)
    created_at = Column(DateTime)
    status = Column(String)
    
    __table_args__ = (
        Index('idx_users_created_at', 'created_at', postgresql_using='btree'),
        Index('idx_users_status_created', 'status', 'created_at'),
        Index('idx_users_email_status', 'email', 'status'),
    )

Шаг 3: Оптимизация Query

Problem: N+1 запросы

# ❌ Плохо — N+1 запросов!
users = session.query(User).limit(100).all()
for user in users:
    print(user.posts)  # Отдельный SQL для каждого!

# ✅ Хорошо — всего 2 запроса
from sqlalchemy.orm import joinedload, selectinload

users = session.query(User).options(
    selectinload(User.posts)
).limit(100).all()

Problem: SELECT *

# ❌ Плохо
user = session.query(User).filter(User.id == 1).first()

# ✅ Хорошо — только нужные колонки
user = session.query(
    User.id,
    User.name,
    User.email
).filter(User.id == 1).first()

Problem: DISTINCT медленный

# ❌ Плохо
users = session.query(User).distinct().all()

# ✅ Хорошо
from sqlalchemy import func
users = session.query(User.id).distinct().all()

Шаг 4: Connection Pooling

from sqlalchemy import create_engine

engine = create_engine(
    'postgresql://user:password@localhost/mydb',
    pool_size=20,
    max_overflow=40,  # дополнительные коннекции при пике
    pool_recycle=3600,  # переподключение каждый час
    echo=False  # отключить логирование SQL
)

# Или с psycopg3
from sqlalchemy.pool import QueuePool
engine = create_engine(
    'postgresql://...',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

Шаг 5: Кеширование

import redis
from functools import wraps
import json
import hashlib

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache_query(ttl=3600):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Генерируем ключ кеша
            cache_key = hashlib.md5(
                f"{func.__name__}:{args}:{kwargs}".encode()
            ).hexdigest()
            
            # Проверяем кеш
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # Выполняем запрос
            result = func(*args, **kwargs)
            
            # Кешируем
            redis_client.setex(
                cache_key,
                ttl,
                json.dumps(result)
            )
            return result
        return wrapper
    return decorator

@cache_query(ttl=1800)
def get_popular_posts(days=7):
    return session.query(Post).filter(
        Post.created_at > datetime.now() - timedelta(days=days)
    ).order_by(Post.likes.desc()).limit(10).all()

Шаг 6: Pagination вместо LIMIT/OFFSET

# ❌ Плохо при больших offset
page_100 = session.query(User).offset(1000000).limit(20).all()

# ✅ Хорошо — keyset pagination
def get_users_keyset(last_id=None, limit=20):
    query = session.query(User).order_by(User.id)
    
    if last_id:
        query = query.filter(User.id > last_id)
    
    return query.limit(limit + 1).all()

Шаг 7: Batch Operations

# ❌ Плохо — по одному
for user_id in user_ids:
    session.query(User).filter(User.id == user_id).update({User.active: True})

# ✅ Хорошо — батч
from sqlalchemy import update
session.execute(
    update(User).where(User.id.in_(user_ids)).values(active=True)
)
session.commit()

Шаг 8: Мониторинг

import logging
import time

logger = logging.getLogger(__name__)

class SlowQueryLogger:
    def __init__(self, threshold_ms=100):
        self.threshold = threshold_ms
    
    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            elapsed = (time.time() - start) * 1000
            
            if elapsed > self.threshold:
                logger.warning(
                    f"Slow query: {func.__name__} took {elapsed:.2f}ms"
                )
            return result
        return wrapper

@SlowQueryLogger(threshold_ms=200)
def get_user_profile(user_id):
    return session.query(User).filter(User.id == user_id).first()

Чеклист реанимации

  1. EXPLAIN ANALYZE — найти узкие места
  2. Добавить индексы — на WHERE и JOIN колонки
  3. Оптимизировать SELECT — только нужные колонки
  4. Избежать N+1 — используй joinedload или selectinload
  5. Кеширование — Redis для часто используемых данных
  6. Connection pooling — не создавать новые коннекции
  7. Pagination — keyset вместо OFFSET
  8. Batch operations — групповые обновления
  9. Мониторинг — логирование медленных запросов

Обычно комбинация 2-3 техник решает 95% проблем с производительностью.