← Назад к вопросам
Как хорошо знаешь RabbitMQ?
1.0 Junior🔥 111 комментариев
#Soft skills и опыт работы#Брокеры сообщений и очереди
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
RabbitMQ: Полное руководство для Node.js разработчика
RabbitMQ — это мощный message broker, который я часто использую в production системах для асинхронной обработки, распределения нагрузки и гарантированной доставки сообщений.
Архитектура RabbitMQ
Основные компоненты:
- Exchange — точка входа для сообщений (маршрутизатор)
- Queue — буфер хранения сообщений
- Binding — связь между Exchange и Queue
- Consumer — приложение, которое обрабатывает сообщения
const amqp = require('amqplib');
// Подключение
const connection = await amqp.connect('amqp://guest:guest@localhost');
const channel = await connection.createChannel();
// Объявление exchange
await channel.assertExchange('my-exchange', 'direct', { durable: true });
// Объявление queue
await channel.assertQueue('my-queue', { durable: true });
// Привязка queue к exchange
await channel.bindQueue('my-queue', 'my-exchange', 'my-key');
Типы Exchange
1. Direct Exchange — маршрутизирует по точному совпадению routing key:
await channel.publish(
'my-exchange',
'order.created',
Buffer.from(JSON.stringify({ orderId: 123 }))
);
2. Topic Exchange — маршрутизирует по паттернам (.created, order.):
await channel.assertExchange('events', 'topic', { durable: true });
await channel.publish(
'events',
'order.payment.completed',
Buffer.from(JSON.stringify({ paymentId: 456 }))
);
3. Fanout Exchange — отправляет всем подписчикам (broadcast):
await channel.assertExchange('notifications', 'fanout', { durable: true });
await channel.publish(
'notifications',
'',
Buffer.from(JSON.stringify({ message: 'System update' }))
);
Consumer (обработчик сообщений)
await channel.assertQueue('tasks', { durable: true });
// Ограничение: обрабатывай только 1 сообщение одновременно
await channel.prefetch(1);
await channel.consume('tasks', async (msg) => {
try {
const task = JSON.parse(msg.content.toString());
console.log('Processing:', task);
// Обработка
await processTask(task);
// Подтверждение (ack)
channel.ack(msg);
} catch (error) {
console.error('Task failed:', error);
// Отправка обратно в очередь
channel.nack(msg, false, true);
}
});
Dead Letter Queue (DLQ) для обработки ошибок
// Основная очередь с DLQ
await channel.assertQueue('main-queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx-exchange',
'x-message-ttl': 60000, // 60 сек
'x-max-retries': 3
}
});
// Dead Letter Queue
await channel.assertExchange('dlx-exchange', 'direct', { durable: true });
await channel.assertQueue('dlq', { durable: true });
await channel.bindQueue('dlq', 'dlx-exchange', '');
RPC (Request-Reply паттерн)
// Server
await channel.assertQueue('rpc.queue', { durable: true });
await channel.consume('rpc.queue', async (msg) => {
const request = JSON.parse(msg.content);
const result = calculate(request.value);
channel.sendToQueue(
msg.properties.replyTo,
Buffer.from(JSON.stringify(result)),
{ correlationId: msg.properties.correlationId }
);
channel.ack(msg);
});
// Client
const correlationId = Math.random().toString();
const replyQueue = await channel.assertQueue('', { exclusive: true });
await channel.sendToQueue(
'rpc.queue',
Buffer.from(JSON.stringify({ value: 42 })),
{
replyTo: replyQueue.queue,
correlationId
}
);
await channel.consume(replyQueue.queue, (msg) => {
if (msg.properties.correlationId === correlationId) {
console.log('Response:', JSON.parse(msg.content));
}
});
Best Practices
-
Idempotency — сообщение может обработаться несколько раз
- Используй unique ID (idempotency key)
- Сохраняй в БД перед обработкой
-
Circuit Breaker — защита от перегрузки
const CircuitBreaker = require('opossum'); const breaker = new CircuitBreaker(async (msg) => { return await processMessage(msg); }); -
Graceful Shutdown — завершение подписок перед выходом
process.on('SIGTERM', async () => { await channel.close(); await connection.close(); process.exit(0); }); -
Monitoring — слежение за очередями
- Используй RabbitMQ Management UI (http://localhost:15672)
- Вели метрики: длина очереди, время обработки, ошибки
Когда использовать
- Асинхронная обработка (email, SMS отправка)
- Микросервисная архитектура
- Гарантированная доставка сообщений
- Балансировка нагрузки между воркерами
RabbitMQ — выбор enterprise приложений с требованиями к надёжности и масштабируемости.