← Назад к вопросам

Для чего используется RabbitMQ?

1.3 Junior🔥 142 комментариев
#Брокеры сообщений и очереди

Комментарии (2)

🐱
claude-haiku-4.5PrepBro AI28 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Для чего используется RabbitMQ

Определение

RabbitMQ — это message broker (посредник обмена сообщениями), который позволяет приложениям обмениваться асинхронными сообщениями через очереди. Он гарантирует доставку сообщений даже если приложение временно недоступно.

Главная идея: отправитель и получатель сообщений не должны работать одновременно.

Основная проблема, которую решает RabbitMQ

Без RabbitMQ (синхронно):

// Клиент создаёт заказ
app.post('/orders', async (req, res) => {
  // 1. Сохранить заказ в БД (0.1 сек)
  await saveOrder(req.body);
  
  // 2. Отправить email подтверждения (2 сек) ❌ ЖДЁМ
  await sendConfirmationEmail(req.body.email);
  
  // 3. Записать в аналитику (1 сек) ❌ ЖДЁМ
  await logAnalytics(req.body);
  
  // 4. Отправить SMS (3 сек) ❌ ЖДЁМ
  await sendSms(req.body.phone);
  
  // Всего: 0.1 + 2 + 1 + 3 = 6 сек ❌ Клиент ждёт!
  res.json({ orderId: order.id });
});

Проблемы:

  • Клиент ждёт 6 секунд вместо 0.1 сек
  • Если email сервис упал, весь заказ падает
  • Если SMS сервис медленный, пользователь ждёт
  • Не масштабируется (каждый запрос блокирует процесс)

С RabbitMQ (асинхронно):

// Клиент создаёт заказ
app.post('/orders', async (req, res) => {
  // 1. Сохранить заказ (0.1 сек)
  const order = await saveOrder(req.body);
  
  // 2. Отправить сообщения в очередь (0.01 сек) ✅ БЫСТРО
  await rabbitMQ.publish('order.created', order);
  
  // Клиент получит ответ за 0.11 сек
  res.json({ orderId: order.id });
});

// Отдельные рабочие процессы обрабатывают сообщения
rabbitMQ.consume('order.created', async (order) => {
  await sendConfirmationEmail(order.email); // 2 сек
  await logAnalytics(order); // 1 сек
  await sendSms(order.phone); // 3 сек
});

Результат:

  • Клиент получит ответ за 0.1 сек (вместо 6 сек)
  • Если email упадёт, заказ не потеряется (сообщение останется в очереди)
  • Email, SMS, аналитика работают параллельно и независимо
  • Легко добавить новый worker для обработки очереди

Архитектура RabbitMQ

┌─────────────┐
│  Producer   │  отправитель сообщений
│ (App Server)│
└──────┬──────┘
       │
       │ публикует
       ↓
┌─────────────────────────────────────┐
│         RabbitMQ Broker             │
│                                     │
│  Exchange: order.events             │
│      ↓                              │
│  Queue: email_notifications         │
│  Queue: sms_notifications          │
│  Queue: analytics_logging          │
└──┬──────────────────────────────────┘
   │
   ├─ подписывается → Consumer 1 (Email Worker)
   ├─ подписывается → Consumer 2 (SMS Worker)
   └─ подписывается → Consumer 3 (Analytics Worker)

Основные компоненты

1. Producer (Производитель) — отправляет сообщения:

const amqp = require('amqplib');

async function sendOrder(order) {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  
  // Объявить exchange
  await channel.assertExchange('order.events', 'topic', { durable: true });
  
  // Отправить сообщение
  channel.publish(
    'order.events',
    'order.created',
    Buffer.from(JSON.stringify(order))
  );
  
  console.log('Order sent to queue');
}

2. Exchange (Точка входа) — маршрутизирует сообщения в очереди:

// Типы exchange:
// direct — точное совпадение routing_key
// topic — маски (order.*)
// fanout — всем подписчикам
// headers — по заголовкам

await channel.assertExchange('order.events', 'topic', { durable: true });

3. Queue (Очередь) — хранит сообщения:

// Объявить очередь
await channel.assertQueue('email_notifications', { durable: true });

// Привязать очередь к exchange
await channel.bindQueue(
  'email_notifications',
  'order.events',
  'order.created' // routing key
);

4. Consumer (Потребитель) — получает и обрабатывает сообщения:

async function consumeOrders() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  
  // Объявить очередь
  await channel.assertQueue('email_notifications', { durable: true });
  
  // Слушать сообщения
  channel.consume('email_notifications', async (message) => {
    if (message) {
      const order = JSON.parse(message.content.toString());
      
      try {
        await sendConfirmationEmail(order.email);
        channel.ack(message); // ✅ Подтверждение успеха
      } catch (error) {
        channel.nack(message, false, true); // ❌ Повтор
      }
    }
  });
}

Реальные примеры использования

Пример 1: Email отправка в фоне

// API сервер
app.post('/send-newsletter', async (req, res) => {
  const { users, subject, body } = req.body;
  
  // Быстро добавить в очередь
  for (const user of users) {
    await channel.publish(
      'email.exchange',
      'email.newsletter',
      Buffer.from(JSON.stringify({ user, subject, body }))
    );
  }
  
  res.json({ message: 'Newsletter queued' }); // Быстрый ответ
});

// Email Worker (отдельный процесс)
worker.consume('email.queue', async (message) => {
  const { user, subject, body } = JSON.parse(message.content);
  await sendEmail(user.email, subject, body);
  worker.ack(message);
});

Пример 2: Обработка платежей

// Payment API
app.post('/pay', async (req, res) => {
  const payment = await createPayment(req.body);
  
  // Опубликовать событие платежа
  await channel.publish(
    'payment.events',
    'payment.processing',
    Buffer.from(JSON.stringify(payment))
  );
  
  res.json({ paymentId: payment.id });
});

// Payment Processor Worker
worker.consume('payment.processing', async (message) => {
  const payment = JSON.parse(message.content);
  
  try {
    const result = await processPaymentWithGateway(payment);
    
    // Опубликовать результат
    await channel.publish(
      'payment.events',
      'payment.completed',
      Buffer.from(JSON.stringify(result))
    );
    
    worker.ack(message);
  } catch (error) {
    worker.nack(message, false, true); // Повтор при ошибке
  }
});

// Notification Worker (слушает payment.completed)
worker2.consume('payment.notifications', async (message) => {
  const result = JSON.parse(message.content);
  await sendPaymentConfirmation(result.user.email);
  worker2.ack(message);
});

Пример 3: Микросервисная архитектура

// User Service
app.post('/users', async (req, res) => {
  const user = await createUser(req.body);
  
  // Оповестить другие сервисы
  await channel.publish('events', 'user.created', Buffer.from(JSON.stringify(user)));
  res.json(user);
});

// Email Service (слушает user.created)
worker.consume('email_queue', async (message) => {
  const user = JSON.parse(message.content);
  await sendWelcomeEmail(user.email);
});

// Analytics Service (слушает user.created)
worker2.consume('analytics_queue', async (message) => {
  const user = JSON.parse(message.content);
  await logUserSignup(user);
});

// Notification Service (слушает user.created)
worker3.consume('notification_queue', async (message) => {
  const user = JSON.parse(message.content);
  await sendWelcomeNotification(user.id);
});

Преимущества RabbitMQ

Асинхронность — отправитель не ждёт получателя ✅ Надёжность — сообщения не потеряются (persistance) ✅ Масштабируемость — легко добавить новых consumers ✅ Развязка (Decoupling) — сервисы независимы друг от друга ✅ Обработка пиков — очередь скапливает сообщения при нагрузке ✅ Повторы — автоматический retry при ошибке ✅ Мониторинг — веб-интерфейс для просмотра очередей

Когда использовать RabbitMQ

Используй когда:

  • Отправка emails/SMS в фоне
  • Обработка платежей
  • Микросервисная архитектура
  • Асинхронная обработка больших данных
  • Event-driven архитектура
  • Нужна гарантия доставки сообщений
  • Нужно масштабировать обработчиков

Может быть избыточным:

  • Простое CRUD приложение
  • Прототип
  • Одна синхронная операция

Docker для разработки

# Запустить RabbitMQ локально
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

# Web интерфейс: http://localhost:15672 (guest/guest)

Выводы

RabbitMQ — это критичный инструмент для:

  • Асинхронной обработки задач
  • Связи между микросервисами
  • Event-driven архитектур
  • Масштабируемых приложений

В индустрии используют либо RabbitMQ, либо альтернативы (Kafka, Redis Streams), но без какого-то message broker современные бэкенды не обойтись.