← Назад к вопросам
Приведи пример задачи где бы использовал RabbitMQ
2.0 Middle🔥 121 комментариев
#Архитектура и паттерны#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
RabbitMQ на практике: реальный пример архитектуры
RabbitMQ — это message broker, который нужен для асинхронной обработки задач и развязывания компонентов в распределённых системах. Расскажу реальный пример, который я использовал в production.
Сценарий: E-commerce платформа с обработкой заказов
Представь, что у тебя есть интернет-магазин с высокой нагрузкой. Пользователь делает заказ, и нужно:
- Сохранить заказ в БД (быстро, <100ms)
- Отправить email (медленно, 2-5 сек)
- Начислить бонусы (может быть очереди, зависит от сервиса)
- Обновить инвентарь (критично, зависит от другого сервиса)
- Отправить SMS уведомление (может быть замедлено)
Если делать всё синхронно, пользователь ждёт 10+ сек. Плохо для UX.
Решение без RabbitMQ (плохо)
from fastapi import FastAPI
from sqlalchemy.orm import Session
import smtplib
import requests
import time
app = FastAPI()
@app.post("/orders")
def create_order(order_data, db: Session):
"""
Синхронная обработка — все шаги блокирующие
"""
start = time.time()
# 1. Сохранить в БД (100ms)
order = Order(**order_data)
db.add(order)
db.commit()
print(f"Заказ сохранён: {time.time() - start:.2f}s")
# 2. Отправить email (3s) — БЛОКИРУЕТ ответ!
send_email(
to=order.customer_email,
subject="Ваш заказ принят",
body=f"Заказ #{order.id} подтверждён"
)
print(f"Email отправлен: {time.time() - start:.2f}s")
# 3. Начислить бонусы (2s) — ЕЩЁ МЕДЛЕННЕЕ!
requests.post(
"https://bonus-service/api/add",
json={"user_id": order.user_id, "points": 100}
)
print(f"Бонусы добавлены: {time.time() - start:.2f}s")
# 4. Обновить инвентарь (1.5s)
requests.post(
"https://inventory-service/api/reserve",
json={"items": order.items}
)
print(f"Инвентарь обновлен: {time.time() - start:.2f}s")
# 5. SMS (2.5s)
send_sms(
to=order.customer_phone,
text="Спасибо за покупку!"
)
print(f"SMS отправлена: {time.time() - start:.2f}s")
# Ответ только после ВСЕГО этого!
return {"status": "order_created", "order_id": order.id}
Проблемы:
- Пользователь ждёт 8.1 сек вместо 100 мс
- Если email сервис упал, заказ не создаётся
- Если SMS сервис медленный, все новые заказы зависают
- Нет retry логики при ошибках
- Нельзя масштабировать — каждый запрос занимает ресурсы
Решение с RabbitMQ (хорошо)
Архитектура:
┌─────────────┐
│ FastAPI │ 1. Пользователь создаёт заказ
└──────┬──────┘
│
├─→ Сохраняет в БД (100ms)
│
└─→ Отправляет события в RabbitMQ
├─ order.created event
├─ inventory.reserve event
└─ bonus.add event
│
└─→ Возвращает ответ (110ms) ✓
┌──────────────────────────────────────────────────────────────┐
│ RabbitMQ Exchanges/Queues │
├──────────────────────────────────────────────────────────────┤
│ │
│ exchange: orders (topic) │
│ ├─ queue: email_notifications │
│ │ └─ Consumer 1: EmailService │
│ │
│ ├─ queue: inventory_reservations │
│ │ └─ Consumer 2: InventoryService │
│ │
│ └─ queue: bonus_accrual │
│ └─ Consumer 3: BonusService │
│ │
└──────────────────────────────────────────────────────────────┘
Producer (FastAPI):
from fastapi import FastAPI
from sqlalchemy.orm import Session
import pika
import json
app = FastAPI()
class OrderProducer:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
self.channel = self.connection.channel()
# Объявляем exchange типа topic
self.channel.exchange_declare(
exchange="orders",
exchange_type="topic",
durable=True
)
def publish_event(self, event_type: str, data: dict):
"""Публикует событие в RabbitMQ"""
message = json.dumps({
"type": event_type,
"data": data
})
self.channel.basic_publish(
exchange="orders",
routing_key=f"order.{event_type}",
body=message,
properties=pika.BasicProperties(delivery_mode=2) # persistent
)
print(f"Опубликовано: {event_type}")
producer = OrderProducer()
@app.post("/orders")
def create_order(order_data, db: Session):
"""
Асинхронная обработка с RabbitMQ
"""
start = time.time()
# 1. Сохранить в БД
order = Order(**order_data)
db.add(order)
db.commit()
# 2. Публикуем события в RabbitMQ (очень быстро!)
producer.publish_event(
"created",
{
"order_id": order.id,
"customer_email": order.customer_email,
"customer_phone": order.customer_phone,
"items": order.items,
"user_id": order.user_id
}
)
print(f"Время ответа: {time.time() - start:.3f}s") # ~110ms вместо 8s!
# 3. Сразу же возвращаем ответ пользователю
return {
"status": "order_accepted",
"order_id": order.id,
"message": "Ваш заказ принят. Скоро получите подтверждение по email."
}
Consumer 1: Email Service
import pika
import json
from email.mime.text import MIMEText
import smtplib
class EmailConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
self.channel = self.connection.channel()
# Объявляем очередь
self.channel.queue_declare(
queue="email_notifications",
durable=True
)
# Привязываем очередь к exchange
self.channel.queue_bind(
exchange="orders",
queue="email_notifications",
routing_key="order.created"
)
# Callback на каждое сообщение
self.channel.basic_consume(
queue="email_notifications",
on_message_callback=self.on_message,
auto_ack=False
)
def on_message(self, ch, method, properties, body):
"""Вызывается при получении сообщения из очереди"""
try:
event = json.loads(body)
data = event["data"]
print(f"Отправляю email на {data["customer_email"]}...")
# Отправляем email (асинхронно, не блокируя другие потребители)
send_email(
to=data["customer_email"],
subject="Заказ принят!",
body=f"Спасибо! Ваш заказ #{data["order_id"]} подтверждён."
)
# Подтверждаем обработку
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"Email отправлен успешно")
except Exception as e:
print(f"Ошибка при отправке email: {e}")
# NACK — сообщение вернётся в очередь для retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def start(self):
print("Email Consumer запущен...")
self.channel.start_consuming()
if __name__ == "__main__":
consumer = EmailConsumer()
consumer.start()
Consumer 2: Inventory Service
class InventoryConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
self.channel = self.connection.channel()
self.channel.queue_declare(
queue="inventory_reservations",
durable=True
)
self.channel.queue_bind(
exchange="orders",
queue="inventory_reservations",
routing_key="order.created"
)
self.channel.basic_consume(
queue="inventory_reservations",
on_message_callback=self.on_message,
auto_ack=False
)
def on_message(self, ch, method, properties, body):
try:
event = json.loads(body)
data = event["data"]
print(f"Зарезервировать товары для заказа {data["order_id"]}")
# Резервируем товары
for item in data["items"]:
reserve_item(
product_id=item["product_id"],
quantity=item["quantity"]
)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"Товары зарезервированы")
except Exception as e:
print(f"Ошибка при резервировании: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def start(self):
print("Inventory Consumer запущен...")
self.channel.start_consuming()
Преимущества этого подхода
1. Быстрый ответ (110ms вместо 8s)
Пользователь доволен, не ждёт
2. Развязка компонентов
Если Email Service упал — заказ всё равно создан
Если Inventory Service медленный — не влияет на создание заказа
3. Масштабируемость
Можна запустить 10 копий EmailConsumer
Можно добавить новый Consumer для SMS
Можно добавить аналитику без изменения основного сервиса
4. Retry логика
Если Consumer упал, сообщение вернётся в очередь
Автоматический retry когда Consumer поднялся
5. Мониторинг
Раббит показывает размер очереди
Можна видеть, какой сервис отстаёт
Легко добавить алерты
Другие примеры использования RabbitMQ
Обработка больших файлов:
- Пользователь загружает видео → сохраняется в БД → отправляется в очередь
- Worker конвертирует видео (может быть медленно)
- Результат уведомляет пользователя через WebSocket
Микросервисная архитектура:
- UserService создаёт пользователя
- Публикует user.created событие
- EmailService, BonusService, AnalyticsService получают событие
Задачи по расписанию:
- Отправка репортов
- Очистка истории
- Переиндексация поиска
Выводы
RabbitMQ использовал бы для:
- I/O операций, которые не нужны мгновенно (email, SMS, push)
- Развязки микросервисов (event-driven архитектура)
- Обработки больших объёмов данных (очереди впитают всплески)
- Гарантированной доставки сообщений (persistent queues)
Не использовал бы RabbitMQ для:
- Простых синхронных операций
- Данных, которые нужны сейчас (например, авторизация)
- Если вся система — один сервис (оверинжиниринг)