Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Плюсы и минусы очередей (Message Queues)
Очереди используются для асинхронной обработки задач между компонентами системы. Основные реализации: RabbitMQ, Redis, Kafka, AWS SQS.
Плюсы
1. Развязка (Decoupling)
Производитель и потребитель работают независимо:
# Вариант БЕЗ очереди: синхронный вызов
@app.post("/send-email")
def send_email(email: str):
# Блокирует на отправку письма (долго)
smtp.send_message(email) # Может быть 2-5 секунд
return {"status": "sent"}
# Варианты БЕЗ очереди: Сервис может быть недоступен
# Email сервис медленный → пользователь ждёт
# Вариант С очередью: асинхронный
@app.post("/send-email")
async def send_email(email: str):
# Просто кладём в очередь (сразу)
await queue.publish("emails", {"email": email})
return {"status": "queued"} # Ответ мгновенный!
# Потребитель (отдельный сервис/воркер)
async def email_worker():
async for message in queue.subscribe("emails"):
send_email_smtp(message["email"]) # Обрабатывает когда может
2. Асинхронность и масштабируемость
Можно обрабатывать больше запросов одновременно:
БЕЗ очереди (3 воркера):
Производитель: [запрос 1] → обработка 5 сек
[запрос 2] → ждёт
[запрос 3] → ждёт
[запрос 4] → ошибка (очередь переполнена)
С очередью (3 потребителя):
Производитель: [1] → очередь → Потребитель 1 обрабатывает
[2] → очередь → Потребитель 2 обрабатывает
[3] → очередь → Потребитель 3 обрабатывает
[4] → очередь → ждёт в очереди (OK)
3. Надёжность (Persistence)
Если потребитель упадёт, сообщение не потеряется (в RabbitMQ, Kafka):
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
# Настройка очереди как persistent
channel.queue_declare(queue="tasks", durable=True)
def callback(ch, method, properties, body):
try:
process_task(body.decode())
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтверждение
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # Вернуть в очередь
channel.basic_consume(queue="tasks", on_message_callback=callback)
channel.start_consuming()
4. Приоритизация
Можно обрабатывать важные задачи раньше:
from celery import Celery
from kombu import Exchange, Queue
app = Celery("tasks")
app.conf.task_queues = (
Queue("high_priority", exchange=Exchange("high"), routing_key="high"),
Queue("normal", exchange=Exchange("normal"), routing_key="normal"),
Queue("low_priority", exchange=Exchange("low"), routing_key="low"),
)
@app.task
def urgent_task():
pass
# Отправить с высоким приоритетом
urgent_task.apply_async(priority=9)
5. Распределение нагрузки
Несколько потребителей обрабатывают один источник:
# RabbitMQ: Round-robin распределение
channel.basic_qos(prefetch_count=1) # Каждый worker берёт по 1 задаче
# Celery: автоматически распределяет между воркерами
celery -A tasks worker -l info
celery -A tasks worker -l info # Ещё один worker
6. Ретрай и Exponential Backoff
Автоматический повтор при ошибке:
from celery import Celery
app = Celery("tasks")
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={"max_retries": 5})
def flaky_task(self):
try:
result = api.call_external_service()
return result
except Exception as e:
# Автоматический повтор с экспоненциальной задержкой
# 1 сек, 2 сек, 4 сек, 8 сек, 16 сек
raise self.retry(exc=e, countdown=2 ** self.request.retries)
# Или явно
flaky_task.apply_async(countdown=5, retry_policy={"max_retries": 3})
Минусы
1. Добавленная сложность
Есть ещё один компонент для отладки и мониторинга:
БЕЗ очереди:
Клиент → FastAPI → БД
С очередью:
Клиент → FastAPI → RabbitMQ → Celery → БД
↑
Ещё один процесс для отладки
2. Задержка доставки
Сообщение обрабатывается не мгновенно:
Удаление письма:
БЕЗ очереди: 5 сек (синхронно, но надёжно)
С очередью: 0.01 сек (быстро) + 5 сек (обработка позже)
= 5.01 сек общее время, но пользователь ждёт только 0.01
3. Гарантии доставки сложные
Нужно выбирать уровень гарантий (At-Most-Once, At-Least-Once, Exactly-Once):
# At-Most-Once: может потеряться
channel.basic_consume(queue="tasks", on_message_callback=callback, auto_ack=True)
# Сообщение удалено ДО обработки → может потеряться при краше
# At-Least-Once: может обработаться дважды
channel.basic_consume(queue="tasks", on_message_callback=callback)
# Сообщение удалено ПОСЛЕ обработки → может обработаться дважды при краше
# (потребитель упал в середине обработки → сообщение вернулось в очередь)
# Exactly-Once: сложно реализовать, требует идемпотентности
def callback(ch, method, properties, body):
task_id = properties.headers["task_id"]
if already_processed(task_id): # Проверяем, не обрабатывали ли
ch.basic_ack(delivery_tag=method.delivery_tag)
return
process_task(body.decode())
mark_as_processed(task_id)
ch.basic_ack(delivery_tag=method.delivery_tag)
4. Отладка сложнее
Видеть полный путь запроса сложнее:
БЕЗ очереди:
Броадкаст в логе:
[12:00:00] POST /send-email
[12:00:01] SMTP send
[12:00:05] Response 200
С очередью:
Броадкаст 1 (API):
[12:00:00] POST /send-email → queued
[12:00:00] Response 200
Броадкаст 2 (Worker):
[12:00:03] Processing email task
[12:00:08] Email sent
Нужно коррелировать по task_id
5. Требование дополнительной инфраструктуры
Прибавляется сервис, который нужно запускать, мониторить, бэкапить:
# Нужно запускать RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3.12
# Мониторить его health
curl http://localhost:15672/api/health
# Бэкапить данные
6. Переобработка при сбое
Если потребитель упадёт при обработке, может произойти переобработка:
# Сценарий проблемы:
[12:00:00] Worker получил сообщение из очереди
[12:00:01] Начал обработку (вычислили результат)
[12:00:02] Упал перед подтверждением ✗
[12:00:03] Worker перезагрузился
[12:00:04] Сообщение вернулось в очередь
[12:00:05] Новый worker обработал его снова
Результат: 2 письма отправлены одному пользователю! 😡
Решение: сделать операцию идемпотентной (можно выполнить дважды без вреда).
7. Стоимость
Dead Letter Queue, мониторинг, хранилище сообщений требуют ресурсов:
Redis очередь: малые нагрузки
RabbitMQ: средние нагрузки + надёжность
Kafka: большие нагрузки + история
Когда использовать очереди
✅ Используй когда:
- Тяжёлые операции (email, отчёты, обработка видео)
- Спайки трафика (пиковые нагрузки)
- Несвязанные сервисы (микросервисы)
- Нужны ретраи (ненадёжные внешние API)
- Приоритизация важна (срочные vs обычные задачи)
❌ Не используй когда:
- Быстрые операции (< 1 сек)
- Нужен синхронный ответ (пользователь ждёт результат)
- Простая архитектура (одна база, нет микросервисов)
- Малые нагрузки (< 100 рпс)
Примеры реальных сценариев
# Пример: Отправка писем
# API (FastAPI)
@app.post("/users")
async def create_user(user: UserCreate):
db_user = await db.create(user)
# Отправляем в очередь вместо синхронного отправления
await task_queue.publish("send_welcome_email", {
"user_id": db_user.id,
"email": db_user.email,
})
return db_user
# Worker (Celery)
@app.task(bind=True, max_retries=3)
def send_welcome_email(self, user_id: int, email: str):
try:
smtp.send(email, "Welcome!", "templates/welcome.html")
mark_as_sent(user_id) # Идемпотентно
except SMTPException as e:
# Повтор с экспоненциальной задержкой
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
Заключение
Очереди — мощный инструмент для масштабируемых систем. Отлично подходят для асинхронных задач, но добавляют сложность. Используй только когда действительно нужны: обработка больших файлов, отправка писем, вызовы медленных API. Для простых приложений синхронный код проще и надёжнее.