Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Экстренная реанимация медленного запроса
Когда запрос начал тормозить в продакшене, нужна методичная диагностика и оптимизация. Это критически важный навык для Python-разработчика.
Шаг 1: Диагностика (EXPLAIN, EXPLAIN ANALYZE)
import sqlite3
from psycopg2 import connect
# PostgreSQL
conn = connect("dbname=mydb user=postgres")
cur = conn.cursor()
# EXPLAIN ANALYZE показывает план выполнения и реальное время
cur.execute("""
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT u.id, u.name, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.created_at > '2024-01-01'
GROUP BY u.id, u.name
ORDER BY post_count DESC
LIMIT 100
""")
plan = cur.fetchone()[0]
for row in plan:
print(f"Seq Scan: {row.get('Node Type')}")
print(f"Actual time: {row.get('Actual Total Time')} ms")
На что смотреть в EXPLAIN:
- Seq Scan — сканирование всей таблицы (плохо)
- Index Scan — использование индекса (хорошо)
- Nested Loop — для каждой строки внешнего запроса ищет в B-дереве
- Hash Join — хеширует меньшую таблицу, ищет в хеше
Шаг 2: Индексирование
# Миграция для добавления индекса
# UP
CREATE INDEX CONCURRENTLY idx_users_created_at
ON users(created_at DESC)
WHERE created_at > '2024-01-01';
CREATE INDEX CONCURRENTLY idx_posts_user_id
ON posts(user_id);
# Составной индекс для сложных WHERE
CREATE INDEX CONCURRENTLY idx_users_status_created
ON users(status, created_at DESC);
# Индекс на выражение
CREATE INDEX CONCURRENTLY idx_users_name_lower
ON users(LOWER(name));
# SQLAlchemy модель с индексами
from sqlalchemy import Column, Integer, String, DateTime, Index, text
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
created_at = Column(DateTime)
status = Column(String)
__table_args__ = (
Index('idx_users_created_at', 'created_at', postgresql_using='btree'),
Index('idx_users_status_created', 'status', 'created_at'),
Index('idx_users_email_status', 'email', 'status'),
)
Шаг 3: Оптимизация Query
Problem: N+1 запросы
# ❌ Плохо — N+1 запросов!
users = session.query(User).limit(100).all()
for user in users:
print(user.posts) # Отдельный SQL для каждого!
# ✅ Хорошо — всего 2 запроса
from sqlalchemy.orm import joinedload, selectinload
users = session.query(User).options(
selectinload(User.posts)
).limit(100).all()
Problem: SELECT *
# ❌ Плохо
user = session.query(User).filter(User.id == 1).first()
# ✅ Хорошо — только нужные колонки
user = session.query(
User.id,
User.name,
User.email
).filter(User.id == 1).first()
Problem: DISTINCT медленный
# ❌ Плохо
users = session.query(User).distinct().all()
# ✅ Хорошо
from sqlalchemy import func
users = session.query(User.id).distinct().all()
Шаг 4: Connection Pooling
from sqlalchemy import create_engine
engine = create_engine(
'postgresql://user:password@localhost/mydb',
pool_size=20,
max_overflow=40, # дополнительные коннекции при пике
pool_recycle=3600, # переподключение каждый час
echo=False # отключить логирование SQL
)
# Или с psycopg3
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://...',
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
Шаг 5: Кеширование
import redis
from functools import wraps
import json
import hashlib
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_query(ttl=3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Генерируем ключ кеша
cache_key = hashlib.md5(
f"{func.__name__}:{args}:{kwargs}".encode()
).hexdigest()
# Проверяем кеш
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Выполняем запрос
result = func(*args, **kwargs)
# Кешируем
redis_client.setex(
cache_key,
ttl,
json.dumps(result)
)
return result
return wrapper
return decorator
@cache_query(ttl=1800)
def get_popular_posts(days=7):
return session.query(Post).filter(
Post.created_at > datetime.now() - timedelta(days=days)
).order_by(Post.likes.desc()).limit(10).all()
Шаг 6: Pagination вместо LIMIT/OFFSET
# ❌ Плохо при больших offset
page_100 = session.query(User).offset(1000000).limit(20).all()
# ✅ Хорошо — keyset pagination
def get_users_keyset(last_id=None, limit=20):
query = session.query(User).order_by(User.id)
if last_id:
query = query.filter(User.id > last_id)
return query.limit(limit + 1).all()
Шаг 7: Batch Operations
# ❌ Плохо — по одному
for user_id in user_ids:
session.query(User).filter(User.id == user_id).update({User.active: True})
# ✅ Хорошо — батч
from sqlalchemy import update
session.execute(
update(User).where(User.id.in_(user_ids)).values(active=True)
)
session.commit()
Шаг 8: Мониторинг
import logging
import time
logger = logging.getLogger(__name__)
class SlowQueryLogger:
def __init__(self, threshold_ms=100):
self.threshold = threshold_ms
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = (time.time() - start) * 1000
if elapsed > self.threshold:
logger.warning(
f"Slow query: {func.__name__} took {elapsed:.2f}ms"
)
return result
return wrapper
@SlowQueryLogger(threshold_ms=200)
def get_user_profile(user_id):
return session.query(User).filter(User.id == user_id).first()
Чеклист реанимации
- EXPLAIN ANALYZE — найти узкие места
- Добавить индексы — на WHERE и JOIN колонки
- Оптимизировать SELECT — только нужные колонки
- Избежать N+1 — используй joinedload или selectinload
- Кеширование — Redis для часто используемых данных
- Connection pooling — не создавать новые коннекции
- Pagination — keyset вместо OFFSET
- Batch operations — групповые обновления
- Мониторинг — логирование медленных запросов
Обычно комбинация 2-3 техник решает 95% проблем с производительностью.