Как устроены очереди в RabbitMQ?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как устроены очереди в RabbitMQ
Основная архитектура
RabbitMQ — это message broker, который основан на AMQP (Advanced Message Queuing Protocol). Сообщения переправляются из producer (отправитель) через exchanges и queues к consumer (получатель).
Producer → Exchange → Queue → Consumer
Ключевые компоненты
1. Exchange (обменник)
Exchange маршрутизирует сообщения в очереди на основе правил (bindings). Существует 4 типа:
Direct Exchange — маршрутизирует по точному совпадению routing key:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# Создаем direct exchange
channel.exchange_declare(exchange="orders", exchange_type="direct")
# Создаем очередь
channel.queue_declare(queue="orders_queue")
# Привязываем очередь к exchange с routing_key
channel.queue_bind(exchange="orders", queue="orders_queue", routing_key="order.created")
# Отправляем сообщение
channel.basic_publish(
exchange="orders",
routing_key="order.created", # Должна совпадать с binding
body="New order #123"
)
Topic Exchange — маршрутизирует по маске (., #):
channel.exchange_declare(exchange="logs", exchange_type="topic")
channel.queue_declare(queue="error_logs")
# Привязываем с маской
channel.queue_bind(exchange="logs", queue="error_logs", routing_key="logs.error.*")
# Сообщения с routing_key logs.error.database попадут в эту очередь
channel.basic_publish(
exchange="logs",
routing_key="logs.error.database",
body="Database connection failed"
)
Fanout Exchange — отправляет копию всем привязанным очередям:
channel.exchange_declare(exchange="notifications", exchange_type="fanout")
# Все сообщения пойдут во все очереди
channel.basic_publish(
exchange="notifications",
routing_key="", # Игнорируется
body="Server is going down for maintenance"
)
Headers Exchange — маршрутизирует по заголовкам сообщения:
channel.exchange_declare(exchange="data", exchange_type="headers")
channel.queue_bind(
exchange="data",
queue="high_priority",
arguments={"x-match": "all", "priority": "high", "type": "alert"}
)
# Отправляем с заголовками
channel.basic_publish(
exchange="data",
routing_key="",
body="Alert message",
properties=pika.BasicProperties(
headers={"priority": "high", "type": "alert"}
)
)
2. Queue (очередь)
Очередь хранит сообщения до их обработки:
# Декларируем очередь с параметрами
channel.queue_declare(
queue="tasks",
durable=True, # Сохранится при перезагрузке RabbitMQ
exclusive=False, # Доступна всем
auto_delete=False # Не удаляется при отключении
)
# С параметрами TTL и max-length
channel.queue_declare(
queue="temporary_tasks",
arguments={
"x-message-ttl": 3600000, # Сообщение живет 1 час (ms)
"x-max-length": 1000, # Максимум 1000 сообщений
"x-dead-letter-exchange": "dlx" # Куда отправить при ошибке
}
)
3. Routing и Binding
Binding связывает exchange с очередью через routing key:
# Binding с routing key
channel.queue_bind(
exchange="tasks",
queue="email_queue",
routing_key="send.email"
)
# Producer отправляет с routing key
channel.basic_publish(
exchange="tasks",
routing_key="send.email",
body="Send email to user@example.com"
)
Consumer (потребитель)
Базовое потребление
def callback(ch, method, properties, body):
try:
message = body.decode()
print(f"Получено сообщение: {message}")
# Обработка сообщения
process_message(message)
# Подтверждаем обработку
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Ошибка: {e}")
# Отправляем обратно в очередь
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=1) # Consumer обрабатывает по 1 сообщению
channel.basic_consume(queue="tasks", on_message_callback=callback)
print("Ожидаю сообщений...")
channel.start_consuming()
С обработкой ошибок и Dead Letter Queue
# Создаем Dead Letter Exchange для ошибок
channel.exchange_declare(exchange="dlx", exchange_type="direct")
channel.queue_declare(queue="dead_letters")
channel.queue_bind(exchange="dlx", queue="dead_letters", routing_key="failed")
# Создаем основную очередь с DLX
channel.queue_declare(
queue="tasks",
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "failed"
}
)
def process_with_retry(ch, method, properties, body, max_retries=3):
retry_count = properties.headers.get("x-retry-count", 0) if properties.headers else 0
try:
message = body.decode()
process_message(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
if retry_count < max_retries:
# Переотправляем в очередь с увеличенным счетчиком
new_headers = properties.headers or {}
new_headers["x-retry-count"] = retry_count + 1
ch.basic_publish(
exchange="tasks",
routing_key="",
body=body,
properties=pika.BasicProperties(headers=new_headers)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# Отправляем в dead letter queue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_consume(queue="tasks", on_message_callback=process_with_retry)
channel.start_consuming()
Гарантии доставки
# Producer отправляет с гарантией доставки
channel.confirm_delivery() # Включаем publisher confirm
try:
channel.basic_publish(
exchange="orders",
routing_key="order.created",
body="New order",
properties=pika.BasicProperties(
delivery_mode=2 # Persisten message - сохранится на диск
)
)
print("Сообщение доставлено")
except pika.exceptions.UnroutableError:
print("Сообщение не маршрутизируется")
Полный пример приложения
# producer.py
import pika
import json
class OrderProducer:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange="orders", exchange_type="direct")
def send_order(self, order_id, items):
message = json.dumps({"order_id": order_id, "items": items})
self.channel.basic_publish(
exchange="orders",
routing_key="order.created",
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Заказ {order_id} отправлен")
def close(self):
self.connection.close()
# consumer.py
import pika
import json
class OrderConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange="orders", exchange_type="direct")
self.channel.queue_declare(queue="orders_queue")
self.channel.queue_bind(exchange="orders", queue="orders_queue", routing_key="order.created")
def process_orders(self):
def callback(ch, method, properties, body):
order = json.loads(body.decode())
print(f"Обрабатываю заказ {order[order_id]}")
# Здесь обработка заказа
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue="orders_queue", on_message_callback=callback)
print("Ожидаю заказов...")
self.channel.start_consuming()
Ключевые особенности
- Асинхронность — producer и consumer независимы
- Надежность — persistent messages сохраняются на диск
- Масштабируемость — несколько consumer могут работать в параллель
- Маршрутизация — гибкая доставка сообщений
- Dead Letter Queue — обработка ошибок
RabbitMQ идеально подходит для асинхронных задач: отправка писем, обработка платежей, логирование, уведомления.