← Назад к вопросам

Как устроены очереди в RabbitMQ?

1.7 Middle🔥 131 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как устроены очереди в RabbitMQ

Основная архитектура

RabbitMQ — это message broker, который основан на AMQP (Advanced Message Queuing Protocol). Сообщения переправляются из producer (отправитель) через exchanges и queues к consumer (получатель).

Producer → Exchange → Queue → Consumer

Ключевые компоненты

1. Exchange (обменник)

Exchange маршрутизирует сообщения в очереди на основе правил (bindings). Существует 4 типа:

Direct Exchange — маршрутизирует по точному совпадению routing key:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# Создаем direct exchange
channel.exchange_declare(exchange="orders", exchange_type="direct")

# Создаем очередь
channel.queue_declare(queue="orders_queue")

# Привязываем очередь к exchange с routing_key
channel.queue_bind(exchange="orders", queue="orders_queue", routing_key="order.created")

# Отправляем сообщение
channel.basic_publish(
    exchange="orders",
    routing_key="order.created",  # Должна совпадать с binding
    body="New order #123"
)

Topic Exchange — маршрутизирует по маске (., #):

channel.exchange_declare(exchange="logs", exchange_type="topic")
channel.queue_declare(queue="error_logs")

# Привязываем с маской
channel.queue_bind(exchange="logs", queue="error_logs", routing_key="logs.error.*")

# Сообщения с routing_key logs.error.database попадут в эту очередь
channel.basic_publish(
    exchange="logs",
    routing_key="logs.error.database",
    body="Database connection failed"
)

Fanout Exchange — отправляет копию всем привязанным очередям:

channel.exchange_declare(exchange="notifications", exchange_type="fanout")

# Все сообщения пойдут во все очереди
channel.basic_publish(
    exchange="notifications",
    routing_key="",  # Игнорируется
    body="Server is going down for maintenance"
)

Headers Exchange — маршрутизирует по заголовкам сообщения:

channel.exchange_declare(exchange="data", exchange_type="headers")

channel.queue_bind(
    exchange="data",
    queue="high_priority",
    arguments={"x-match": "all", "priority": "high", "type": "alert"}
)

# Отправляем с заголовками
channel.basic_publish(
    exchange="data",
    routing_key="",
    body="Alert message",
    properties=pika.BasicProperties(
        headers={"priority": "high", "type": "alert"}
    )
)

2. Queue (очередь)

Очередь хранит сообщения до их обработки:

# Декларируем очередь с параметрами
channel.queue_declare(
    queue="tasks",
    durable=True,  # Сохранится при перезагрузке RabbitMQ
    exclusive=False,  # Доступна всем
    auto_delete=False  # Не удаляется при отключении
)

# С параметрами TTL и max-length
channel.queue_declare(
    queue="temporary_tasks",
    arguments={
        "x-message-ttl": 3600000,  # Сообщение живет 1 час (ms)
        "x-max-length": 1000,  # Максимум 1000 сообщений
        "x-dead-letter-exchange": "dlx"  # Куда отправить при ошибке
    }
)

3. Routing и Binding

Binding связывает exchange с очередью через routing key:

# Binding с routing key
channel.queue_bind(
    exchange="tasks",
    queue="email_queue",
    routing_key="send.email"
)

# Producer отправляет с routing key
channel.basic_publish(
    exchange="tasks",
    routing_key="send.email",
    body="Send email to user@example.com"
)

Consumer (потребитель)

Базовое потребление

def callback(ch, method, properties, body):
    try:
        message = body.decode()
        print(f"Получено сообщение: {message}")
        
        # Обработка сообщения
        process_message(message)
        
        # Подтверждаем обработку
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Ошибка: {e}")
        # Отправляем обратно в очередь
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_qos(prefetch_count=1)  # Consumer обрабатывает по 1 сообщению
channel.basic_consume(queue="tasks", on_message_callback=callback)

print("Ожидаю сообщений...")
channel.start_consuming()

С обработкой ошибок и Dead Letter Queue

# Создаем Dead Letter Exchange для ошибок
channel.exchange_declare(exchange="dlx", exchange_type="direct")
channel.queue_declare(queue="dead_letters")
channel.queue_bind(exchange="dlx", queue="dead_letters", routing_key="failed")

# Создаем основную очередь с DLX
channel.queue_declare(
    queue="tasks",
    arguments={
        "x-dead-letter-exchange": "dlx",
        "x-dead-letter-routing-key": "failed"
    }
)

def process_with_retry(ch, method, properties, body, max_retries=3):
    retry_count = properties.headers.get("x-retry-count", 0) if properties.headers else 0
    
    try:
        message = body.decode()
        process_message(message)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        if retry_count < max_retries:
            # Переотправляем в очередь с увеличенным счетчиком
            new_headers = properties.headers or {}
            new_headers["x-retry-count"] = retry_count + 1
            
            ch.basic_publish(
                exchange="tasks",
                routing_key="",
                body=body,
                properties=pika.BasicProperties(headers=new_headers)
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            # Отправляем в dead letter queue
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_consume(queue="tasks", on_message_callback=process_with_retry)
channel.start_consuming()

Гарантии доставки

# Producer отправляет с гарантией доставки
channel.confirm_delivery()  # Включаем publisher confirm

try:
    channel.basic_publish(
        exchange="orders",
        routing_key="order.created",
        body="New order",
        properties=pika.BasicProperties(
            delivery_mode=2  # Persisten message - сохранится на диск
        )
    )
    print("Сообщение доставлено")
except pika.exceptions.UnroutableError:
    print("Сообщение не маршрутизируется")

Полный пример приложения

# producer.py
import pika
import json

class OrderProducer:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange="orders", exchange_type="direct")
    
    def send_order(self, order_id, items):
        message = json.dumps({"order_id": order_id, "items": items})
        self.channel.basic_publish(
            exchange="orders",
            routing_key="order.created",
            body=message,
            properties=pika.BasicProperties(delivery_mode=2)
        )
        print(f"Заказ {order_id} отправлен")
    
    def close(self):
        self.connection.close()

# consumer.py
import pika
import json

class OrderConsumer:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange="orders", exchange_type="direct")
        self.channel.queue_declare(queue="orders_queue")
        self.channel.queue_bind(exchange="orders", queue="orders_queue", routing_key="order.created")
    
    def process_orders(self):
        def callback(ch, method, properties, body):
            order = json.loads(body.decode())
            print(f"Обрабатываю заказ {order[order_id]}")
            # Здесь обработка заказа
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
        self.channel.basic_consume(queue="orders_queue", on_message_callback=callback)
        print("Ожидаю заказов...")
        self.channel.start_consuming()

Ключевые особенности

  • Асинхронность — producer и consumer независимы
  • Надежность — persistent messages сохраняются на диск
  • Масштабируемость — несколько consumer могут работать в параллель
  • Маршрутизация — гибкая доставка сообщений
  • Dead Letter Queue — обработка ошибок

RabbitMQ идеально подходит для асинхронных задач: отправка писем, обработка платежей, логирование, уведомления.