← Назад к вопросам

Как хорошо знаешь RabbitMQ?

1.0 Junior🔥 111 комментариев
#Soft skills и опыт работы#Брокеры сообщений и очереди

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

RabbitMQ: Полное руководство для Node.js разработчика

RabbitMQ — это мощный message broker, который я часто использую в production системах для асинхронной обработки, распределения нагрузки и гарантированной доставки сообщений.

Архитектура RabbitMQ

Основные компоненты:

  • Exchange — точка входа для сообщений (маршрутизатор)
  • Queue — буфер хранения сообщений
  • Binding — связь между Exchange и Queue
  • Consumer — приложение, которое обрабатывает сообщения
const amqp = require('amqplib');

// Подключение
const connection = await amqp.connect('amqp://guest:guest@localhost');
const channel = await connection.createChannel();

// Объявление exchange
await channel.assertExchange('my-exchange', 'direct', { durable: true });

// Объявление queue
await channel.assertQueue('my-queue', { durable: true });

// Привязка queue к exchange
await channel.bindQueue('my-queue', 'my-exchange', 'my-key');

Типы Exchange

1. Direct Exchange — маршрутизирует по точному совпадению routing key:

await channel.publish(
  'my-exchange',
  'order.created',
  Buffer.from(JSON.stringify({ orderId: 123 }))
);

2. Topic Exchange — маршрутизирует по паттернам (.created, order.):

await channel.assertExchange('events', 'topic', { durable: true });
await channel.publish(
  'events',
  'order.payment.completed',
  Buffer.from(JSON.stringify({ paymentId: 456 }))
);

3. Fanout Exchange — отправляет всем подписчикам (broadcast):

await channel.assertExchange('notifications', 'fanout', { durable: true });
await channel.publish(
  'notifications',
  '',
  Buffer.from(JSON.stringify({ message: 'System update' }))
);

Consumer (обработчик сообщений)

await channel.assertQueue('tasks', { durable: true });

// Ограничение: обрабатывай только 1 сообщение одновременно
await channel.prefetch(1);

await channel.consume('tasks', async (msg) => {
  try {
    const task = JSON.parse(msg.content.toString());
    console.log('Processing:', task);
    
    // Обработка
    await processTask(task);
    
    // Подтверждение (ack)
    channel.ack(msg);
  } catch (error) {
    console.error('Task failed:', error);
    // Отправка обратно в очередь
    channel.nack(msg, false, true);
  }
});

Dead Letter Queue (DLQ) для обработки ошибок

// Основная очередь с DLQ
await channel.assertQueue('main-queue', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'dlx-exchange',
    'x-message-ttl': 60000, // 60 сек
    'x-max-retries': 3
  }
});

// Dead Letter Queue
await channel.assertExchange('dlx-exchange', 'direct', { durable: true });
await channel.assertQueue('dlq', { durable: true });
await channel.bindQueue('dlq', 'dlx-exchange', '');

RPC (Request-Reply паттерн)

// Server
await channel.assertQueue('rpc.queue', { durable: true });

await channel.consume('rpc.queue', async (msg) => {
  const request = JSON.parse(msg.content);
  const result = calculate(request.value);
  
  channel.sendToQueue(
    msg.properties.replyTo,
    Buffer.from(JSON.stringify(result)),
    { correlationId: msg.properties.correlationId }
  );
  
  channel.ack(msg);
});

// Client
const correlationId = Math.random().toString();
const replyQueue = await channel.assertQueue('', { exclusive: true });

await channel.sendToQueue(
  'rpc.queue',
  Buffer.from(JSON.stringify({ value: 42 })),
  {
    replyTo: replyQueue.queue,
    correlationId
  }
);

await channel.consume(replyQueue.queue, (msg) => {
  if (msg.properties.correlationId === correlationId) {
    console.log('Response:', JSON.parse(msg.content));
  }
});

Best Practices

  1. Idempotency — сообщение может обработаться несколько раз

    • Используй unique ID (idempotency key)
    • Сохраняй в БД перед обработкой
  2. Circuit Breaker — защита от перегрузки

    const CircuitBreaker = require('opossum');
    const breaker = new CircuitBreaker(async (msg) => {
      return await processMessage(msg);
    });
    
  3. Graceful Shutdown — завершение подписок перед выходом

    process.on('SIGTERM', async () => {
      await channel.close();
      await connection.close();
      process.exit(0);
    });
    
  4. Monitoring — слежение за очередями

    • Используй RabbitMQ Management UI (http://localhost:15672)
    • Вели метрики: длина очереди, время обработки, ошибки

Когда использовать

  • Асинхронная обработка (email, SMS отправка)
  • Микросервисная архитектура
  • Гарантированная доставка сообщений
  • Балансировка нагрузки между воркерами

RabbitMQ — выбор enterprise приложений с требованиями к надёжности и масштабируемости.

Как хорошо знаешь RabbitMQ? | PrepBro