← Назад к вопросам

Улучшал ли запросы в MongoDB

2.7 Senior🔥 151 комментариев
#DevOps и инфраструктура

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Оптимизация запросов в MongoDB

Да, я много работал с оптимизацией MongoDB запросов в production среде, где это было критично для производительности. Хочу поделиться конкретными подходами и техниками, которые дали ощутимые результаты.

Анализ выполнения запросов (explain)

Первое, что я делаю — изучаю план выполнения запроса через explain(). Это показывает, используются ли индексы и сколько документов сканируется:

import pymongo
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017")
db = client["mydb"]
collection = db["users"]

# Анализируем план выполнения
explain_result = collection.find({"email": "user@example.com"}).explain()
print(f"Количество просмотренных документов: {explain_result[executionStats][totalDocsExamined]}")
print(f"Количество возвращённых документов: {explain_result[executionStats][nReturned]}")
print(f"Используется индекс: {explain_result[executionStats][executionStages][stage]}")

Создание индексов

Большинство проблем производительности решаются правильными индексами. Я создаю индексы на поля, используемые в фильтрах и сортировке:

# Простой индекс на одно поле
collection.create_index("email")

# Составной индекс для сложных запросов
collection.create_index([("user_id", 1), ("created_at", -1)])

# Уникальный индекс
collection.create_index("email", unique=True)

# Индекс с опциями (например, sparse для опциональных полей)
collection.create_index("phone", sparse=True)

# Проверяем существующие индексы
for index in collection.list_indexes():
    print(index)

Оптимизация через projection (выбор полей)

Вместо загрузки всех полей документа, я выбираю только необходимые. Это снижает объём передачи данных:

# ❌ Плохо: загружаем все поля
users = list(collection.find({"status": "active"}))

# ✅ Хорошо: выбираем только нужные поля
users = list(collection.find(
    {"status": "active"},
    {"_id": 1, "name": 1, "email": 1}  # projection
))

Батчинг и пагинация

Для больших наборов данных используем пагинацию с skip() и limit():

# Пагинация с cursor-based подходом (более эффективно)
def get_users_paginated(collection, batch_size=100, last_id=None):
    query = {}
    if last_id:
        from bson import ObjectId
        query = {"_id": {"$gt": ObjectId(last_id)}}
    
    return list(collection.find(query).limit(batch_size))

# Для очень больших объёмов используем aggregation pipeline
pipeline = [
    {"$match": {"status": "active"}},
    {"$skip": 1000},
    {"$limit": 100},
    {"$project": {"_id": 1, "name": 1, "email": 1}}
]
results = list(collection.aggregate(pipeline))

Aggregation pipeline для сложных операций

MongoDB aggregation намного быстрее, чем загрузить данные в Python и обработать:

# ❌ Медленно: загружаем и обрабатываем в Python
all_orders = list(collection.find({"status": "completed"}))
total_revenue = sum(order["amount"] for order in all_orders)

# ✅ Быстро: обработка на уровне БД
pipeline = [
    {"$match": {"status": "completed"}},
    {"$group": {
        "_id": None,
        "total_revenue": {"$sum": "$amount"},
        "count": {"$sum": 1}
    }}
]
result = collection.aggregate(pipeline)
for doc in result:
    print(f"Общий доход: {doc[total_revenue]}, заказов: {doc[count]}")

Bulk операции

Для массовых обновлений используем bulk операции вместо циклов:

from pymongo import UpdateOne

# ❌ Медленно: обновления в цикле
for user_id in user_ids:
    collection.update_one({"_id": user_id}, {"$set": {"processed": True}})

# ✅ Быстро: bulk операции
operations = [
    UpdateOne({"_id": user_id}, {"$set": {"processed": True}})
    for user_id in user_ids
]
result = collection.bulk_write(operations)
print(f"Обновлено: {result.modified_count} документов")

Connection pooling и конфигурация

Правильная конфигурация клиента тоже влияет на производительность:

# Настройка пула соединений
client = MongoClient(
    "mongodb://localhost:27017",
    maxPoolSize=50,        # Максимум соединений в пуле
    minPoolSize=10,        # Минимум соединений
    maxIdleTimeMS=45000,   # Время жизни неиспользуемого соединения
    retryWrites=True       # Автоматический повтор при сбое
)

Мониторинг и логирование

Я настраиваю логирование медленных запросов для выявления проблем:

import logging
from pymongo import monitoring

# Логирование команд для отладки
def on_command(event):
    if event.duration_millis > 100:  # Логируем запросы > 100ms
        logging.warning(f"Медленный запрос: {event.command_name} - {event.duration_millis}ms")

monitoring.register(monitoring.CommandListener(on_command, on_command, on_command))

Практические результаты

На одном проекте эти оптимизации дали:

  • Сокращение времени запросов с 5 сек на 500ms через правильные индексы
  • Снижение нагрузки на БД в 10 раз через aggregation pipeline вместо Python обработки
  • Улучшение throughput в 3 раза через bulk операции

Ключевой момент: всегда начинайте с анализа через explain(), потом добавляйте индексы, и только потом переписывайте логику запросов. Это экономит время и даёт максимальный эффект.

Улучшал ли запросы в MongoDB | PrepBro