← Назад к вопросам
Какие знаешь варианты решения проблемы с медленным запросом к БД?
2.2 Middle🔥 251 комментариев
#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Какие знаешь варианты решения проблемы с медленным запросом к БД?
Медленные запросы — одна из главных причин падения performance любого приложения. Вот полный набор техник, которые работают на практике.
Шаг 1: Анализ проблемы
Найди медленный запрос
-- PostgreSQL: включи логирование медленных запросов
SET log_min_duration_statement = 1000; -- логируй запросы > 1 сек
-- MySQL: slow query log
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;
-- SQLite: профилирование
PRAGMA query_only = ON;
Анализируй план запроса
-- PostgreSQL/MySQL
EXPLAIN ANALYZE SELECT * FROM users WHERE name = 'John';
-- Результат покажет:
-- - Seq Scan (полный скан таблицы) — ❌ плохо
-- - Index Scan (использование индекса) — ✓ хорошо
-- - Actual rows vs Planned rows — если отличаются, статистика устарела
Вариант 1: Добавить индекс
Самый частый и быстрый способ!
-- Поиск по имени
CREATE INDEX idx_users_name ON users(name);
-- Составной индекс (несколько колонок)
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at DESC);
-- Индекс с фильтром
CREATE INDEX idx_active_users ON users(name) WHERE is_active = true;
-- Частичный индекс (маленький, быстро)
CREATE INDEX idx_pending_orders ON orders(id) WHERE status = 'pending';
Когда помогает:
- WHERE условия
- JOIN условия (на foreign key)
- ORDER BY
- DISTINCT
Когда НЕ помогает:
- LIKE '%text' (нужен индекс типа FULLTEXT)
- Функции:
WHERE UPPER(name) = 'JOHN'(индекс не помогает)
# Python + SQLAlchemy
from sqlalchemy import Column, String, Index
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(100))
__table_args__ = (
Index('idx_name', 'name'), # Простой индекс
Index('idx_email_active', 'email', 'is_active'), # Составной индекс
)
Вариант 2: Оптимизировать SELECT
Проблема: N+1 запрос
# ❌ Плохо: N+1 запрос (1 + 1000 = 1001 запрос!)
users = User.query.all()
for user in users:
print(user.posts) # Каждый раз новый запрос!
# ✓ Хорошо: 1 запрос с JOIN
users = User.query.options(joinedload(User.posts)).all()
# Или эквивалентно в SQL
SELECT u.*, p.* FROM users u
LEFT JOIN posts p ON u.id = p.user_id;
Проблема: Выбираю ВСЕ колонки
# ❌ Плохо: выбираю лишние данные (500 МБ большой текст)
users = User.query.all() # SELECT * ← с huge_text колонкой
# ✓ Хорошо: выбираю только нужные
users = User.query.with_entities(User.id, User.name, User.email).all()
# SQL эквивалент
SELECT id, name, email FROM users; -- без huge_text
Проблема: DISTINCT на большой таблице
# ❌ Плохо: очень медленно на миллионах строк
users = User.query.distinct().all()
# ✓ Хорошо: фильтруй, потом DISTINCT
users = User.query.filter(User.is_active == True).distinct().all()
# Или избегай DISTINCT совсем
users = User.query.group_by(User.id).all()
Вариант 3: Переписать запрос
Проблема: Вложенные INNER JOIN
-- ❌ Медленно
SELECT u.* FROM users u
WHERE u.id IN (
SELECT user_id FROM orders
WHERE order_date > '2024-01-01'
)
AND u.is_active = true;
-- ✓ Быстро: одна таблица в WHERE
SELECT DISTINCT u.* FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.order_date > '2024-01-01'
AND u.is_active = true;
Проблема: GROUP BY после выборки большого объема
-- ❌ Медленно
SELECT user_id, COUNT(*) as count
FROM orders
GROUP BY user_id
HAVING COUNT(*) > 10;
-- ✓ Быстро: используй HAVING прямо
SELECT user_id, COUNT(*) as count
FROM orders
GROUP BY user_id
HAVING COUNT(*) > 10
LIMIT 1000; # Ограничь результаты
Вариант 4: Денормализация
Когда индексы не помогают, добавь лишнюю колонку
# Проблема: считаю posts на лету (медленно)
select count from (
select user_id, count(*) from posts group by user_id
) as pc where user_id = 123;
# Решение: денормализ, храни в таблице users
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
posts_count = Column(Integer, default=0) # ← денормализованное поле
# Обновляй при создании/удалении поста
post = Post(user_id=user.id)
db.session.add(post)
user.posts_count += 1 # Обновляем счётчик
db.session.commit()
Вариант 5: Кеширование
Redis кеш
from redis import Redis
import json
import hashlib
redis = Redis(host='localhost', port=6379)
def get_user_posts(user_id, cache_ttl=3600):
cache_key = f'user:{user_id}:posts'
# Проверь кеш
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
# Если нет в кеше — запрос в БД
posts = User.query.get(user_id).posts
result = [{'id': p.id, 'title': p.title} for p in posts]
# Сохрани в кеш на 1 час
redis.setex(cache_key, cache_ttl, json.dumps(result))
return result
# Инвалидация кеша при изменении
def update_post(post_id, title):
post = Post.query.get(post_id)
post.title = title
db.session.commit()
# Инвалидируй кеш пользователя
redis.delete(f'user:{post.user_id}:posts')
Query caching (ORM level)
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def get_user_by_id(user_id):
"""Кеш на уровне приложения (осторожнее с многопроцессностью!)"""
return User.query.get(user_id)
# Очистка кеша
get_user_by_id.cache_clear()
Вариант 6: Миграция на аналитическую БД
Если запрос считает агрегаты на миллиардах строк
# Используй ClickHouse / BigQuery вместо PostgreSQL
from clickhouse_driver import Client
client = Client('localhost')
result = client.execute(
'SELECT user_id, COUNT(*) as count FROM orders GROUP BY user_id ORDER BY count DESC LIMIT 10'
)
# Или Snowflake, Redshift для аналитики
Вариант 7: Партиционирование таблицы
Когда таблица > 10 ГБ, раздели на части
-- PostgreSQL: партиционирование по дате
CREATE TABLE orders (
id SERIAL,
user_id INT,
created_at TIMESTAMP,
amount DECIMAL
) PARTITION BY RANGE (YEAR(created_at));
-- 2024 в отдельной партиции
CREATE TABLE orders_2024 PARTITION OF orders
FOR VALUES FROM (2024) TO (2025);
CREATE TABLE orders_2025 PARTITION OF orders
FOR VALUES FROM (2025) TO (2026);
Вариант 8: Batch обработка вместо single queries
# ❌ Плохо: 1000 запросов
for user_id in user_ids:
user = User.query.get(user_id)
update_score(user)
# ✓ Хорошо: 1 запрос
users = User.query.filter(User.id.in_(user_ids)).all()
for user in users:
update_score(user)
# ✓ Ещё лучше: batch update
from sqlalchemy import update
db.session.execute(
update(User).where(
User.id.in_(user_ids)
).values(score=User.score + 10)
)
db.session.commit()
Вариант 9: Использовать LIMIT/OFFSET умнее
-- ❌ Медленно при большом OFFSET (проходит все строки)
SELECT * FROM orders OFFSET 1000000 LIMIT 10;
-- ✓ Быстро: ищи по ID
SELECT * FROM orders WHERE id > last_id LIMIT 10;
-- ✓ Ещё лучше: используй cursor
-- SELECT * FROM orders WHERE id > CURSOR_ID LIMIT 10
Чеклист оптимизации
# Когда запрос медленный:
1. EXPLAIN ANALYZE — посмотри план
2. Есть Seq Scan? → Добавь индекс
3. N+1 запросы? → Используй JOIN
4. Выбираю слишком много колонок? → Выбери только нужные
5. LIMIT большой? → Используй cursor или WHERE по ID
6. Частые одинаковые запросы? → Кеширование (Redis)
7. Таблица огромная? → Партиционирование
8. Аналитические запросы? → Отдельная аналитическая БД
9. Индексов слишком много? → Удали неиспользуемые
10. Статистика устарела? → ANALYZE table
Практический пример: оптимизация с 30 сек до 0.1 сек
# Было: 30 сек (медленно)
users = User.query.all() # SELECT * — все 50 колонок!
for user in users:
orders = Order.query.filter(Order.user_id == user.id).count() # N+1!
print(user.id, orders)
# Стало: 0.1 сек (быстро)
from sqlalchemy import func
result = db.session.query(
User.id,
func.count(Order.id).label('orders_count')
).join(
Order, User.id == Order.user_id
).group_by(
User.id
).all()
# Результат: 300x ускорение!