← Назад к вопросам

Как ускорить зависающий SQL-запрос?

2.0 Middle🔥 241 комментариев
#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как ускорить зависающий SQL-запрос

Зависающий SQL-запрос — это запрос, выполняющийся слишком долго. Существует систематический подход к диагностике и оптимизации.

Шаг 1: Диагностика с EXPLAIN

import time
from sqlalchemy import text
from sqlalchemy.orm import Session

def analyze_slow_query(session: Session, query: str):
    # PostgreSQL — EXPLAIN ANALYZE показывает план и фактическое время
    explain = f"EXPLAIN ANALYZE {query}"
    result = session.execute(text(explain))
    
    print("План выполнения:")
    for row in result:
        print(row[0])
    
    # Измеряем время
    start = time.time()
    session.execute(text(query))
    elapsed = time.time() - start
    print(f"Время: {elapsed:.3f}s")

Проблема 1: Отсутствие индекса

-- Медленно: Seq Scan on users (cost=0..100000)
SELECT * FROM users WHERE email = 'test@example.com';

-- Быстро: с индексом
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'test@example.com';
-- Index Scan using idx_users_email (cost=0..10)

Проблема 2: N+1 SELECT

# Медленно: 1 запрос для юзеров + N запросов для постов
users = session.query(User).all()
for user in users:
    print(user.posts)  # Новый SELECT для каждого!

# Быстро: Eager loading
from sqlalchemy.orm import joinedload
users = session.query(User).options(
    joinedload(User.posts)
).all()
# Один запрос с JOIN

Проблема 3: Неправильный LIKE

-- Медленно: Seq Scan (левый символ подстановки)
SELECT * FROM users WHERE name LIKE '%john%';

-- Быстро: Правый символ подстановки
SELECT * FROM users WHERE name LIKE 'john%';

-- Ещё быстрее: Полнотекстовый поиск
SELECT * FROM users WHERE to_tsvector(name) @@ to_tsquery('john');

Проблема 4: Фильтр ПОСЛЕ JOIN

-- Медленно: Объединяет всё, потом фильтрует
SELECT u.*, p.* FROM users u
JOIN posts p ON u.id = p.user_id
WHERE u.created_at > '2024-01-01';

-- Быстро: Фильтрует до объединения
SELECT u.*, p.* FROM users u
WHERE u.created_at > '2024-01-01'
JOIN posts p ON u.id = p.user_id;

Проблема 5: Неправильные типы данных

-- Медленно: Преобразование типов
SELECT * FROM posts WHERE user_id = '123';  -- string to integer

-- Быстро: Правильный тип
SELECT * FROM posts WHERE user_id = 123;  -- integer to integer

Оптимизация в коде

from sqlalchemy import Column, Integer, String, DateTime, Index
from sqlalchemy.orm import relationship

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    email = Column(String, index=True)  # Индекс
    created_at = Column(DateTime, index=True)  # Индекс
    posts = relationship('Post', lazy='selectin')  # Eager loading

class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'), index=True)
    title = Column(String)

# Оптимальный запрос
from sqlalchemy.orm import joinedload

def get_user_with_posts(session: Session, user_id: int):
    return session.query(User)\
        .options(joinedload(User.posts))\
        .filter(User.id == user_id)\
        .first()

# Только нужные колонки
def get_user_emails(session: Session):
    return session.query(User.email).all()

# Пакетная обработка
def update_users_batch(session: Session, user_ids: list[int]):
    session.query(User)\
        .filter(User.id.in_(user_ids))\
        .update({'is_active': True}, synchronize_session=False)
    session.commit()

Профилирование запросов

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time

@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())

@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop(-1)
    if total > 1.0:
        print(f"SLOW QUERY ({total:.3f}s): {statement}")

Кеширование результатов

import functools
from datetime import datetime, timedelta

query_cache = {}

def cached_query(expire_seconds: int = 300):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = (func.__name__, args, tuple(sorted(kwargs.items())))
            
            if key in query_cache:
                data, timestamp = query_cache[key]
                if datetime.now() - timestamp < timedelta(seconds=expire_seconds):
                    return data
            
            result = func(*args, **kwargs)
            query_cache[key] = (result, datetime.now())
            return result
        
        return wrapper
    return decorator

@cached_query(expire_seconds=600)
def get_popular_posts(session: Session):
    return session.query(Post).filter(Post.views > 1000).all()

Чеклист оптимизации

  1. EXPLAIN ANALYZE показывает узкое место?
  2. Индекс на колонке в WHERE?
  3. Индекс на FOREIGN KEY в JOIN?
  4. Eager loading для relationships?
  5. N+1 SELECT проблема?
  6. Полный скан (Seq Scan) можно избежать?
  7. Фильтры до JOIN, не после?
  8. Правильные типы в условиях?
  9. LIMIT/OFFSET правильно используются?
  10. Результаты кешируются?

Оптимизация SQL — это систематический процесс: профилирование → диагностика → применение стратегии. Начните с EXPLAIN ANALYZE!