← Назад к вопросам
Что в RabbitMQ между продюсером и консьюмером?
2.7 Senior🔥 121 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
RabbitMQ: архитектура между продюсером и консьюмером
RabbitMQ — это message broker, система очередей, которая передаёт сообщения между продюсером (отправитель) и консьюмером (получатель). Это критическая часть асинхронной архитектуры. Объясню все компоненты.
Основная архитектура
Продюсер → Exchange → Queue → Консьюмер
↓ ↓ ↓ ↓
приложение маршрут буфер другое приложение
Компонент 1: Exchange (Обменник)
Exchange — это маршрутизатор сообщений. Он решает, какую очередь выбрать.
from pika import BlockingConnection, ConnectionParameters
import pika
# Продюсер публикует сообщение в Exchange
connection = BlockingConnection(ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем Exchange
channel.exchange_declare(
exchange='orders_exchange',
exchange_type='direct', # или 'topic', 'fanout', 'headers'
durable=True # Сохранится при перезагрузке
)
# Публикуем сообщение
channel.basic_publish(
exchange='orders_exchange',
routing_key='order.created', # ← Это ключ маршрутизации
body=b'{"order_id": 123, "user_id": 456}',
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2 # persistent — не потеряется при crash
)
)
Типы Exchange
# 1. Direct Exchange — точное совпадение
channel.exchange_declare(exchange='direct_ex', exchange_type='direct')
# routing_key='user.created' → попадёт в очередь с binding_key='user.created'
# routing_key='user.deleted' → не попадёт в очередь с binding_key='user.created'
# 2. Topic Exchange — с wildcard
channel.exchange_declare(exchange='topic_ex', exchange_type='topic')
# routing_key='user.created' → совпадает с 'user.*' и 'user.#'
# routing_key='order.payment.success' → совпадает с 'order.*.success' и 'order.#'
# 3. Fanout Exchange — broadcast всем очередям
channel.exchange_declare(exchange='fanout_ex', exchange_type='fanout')
# Игнорирует routing_key, просто отправляет всем очередям
# 4. Headers Exchange — по заголовкам сообщения
channel.exchange_declare(exchange='headers_ex', exchange_type='headers')
# Маршрутизация по properties сообщения
Компонент 2: Queue (Очередь)
Очередь — это буфер сообщений в памяти/на диске.
# Консьюмер создаёт очередь
channel.queue_declare(
queue='order_processing_queue',
durable=True, # Сохранится при перезагрузке
exclusive=False, # Может быть использована несколькими консьюмерами
auto_delete=False, # Не удаляется, когда нет консьюмеров
arguments={
'x-message-ttl': 3600000, # 1 час — TTL сообщения
'x-max-length': 10000, # Max 10000 сообщений в очереди
'x-dead-letter-exchange': 'dlx_exchange', # DLX для ошибок
}
)
Компонент 3: Binding (Привязка)
Binding связывает Exchange с Queue.
# Привязка: Exchange → Queue с ключом маршрутизации
channel.queue_bind(
exchange='orders_exchange',
queue='order_processing_queue',
routing_key='order.*' # Очередь получит сообщения с этим паттерном
)
# Можно привязать несколько очередей к одному Exchange
channel.queue_bind(
exchange='orders_exchange',
queue='order_analytics_queue',
routing_key='order.*'
)
# Теперь оба консьюмера получат сообщение
Полный пример: продюсер
import pika
import json
class OrderProducer:
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
# Объявляем Exchange
self.channel.exchange_declare(
exchange='orders_exchange',
exchange_type='topic',
durable=True
)
def publish_order(self, order_data):
message = json.dumps(order_data)
self.channel.basic_publish(
exchange='orders_exchange',
routing_key='order.created',
body=message,
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2, # Persistent
expiration='3600000' # 1 час
)
)
print(f"Published: {message}")
def close(self):
self.connection.close()
# Использование
producer = OrderProducer()
producer.publish_order({
'order_id': 123,
'user_id': 456,
'total': 99.99
})
producer.close()
Полный пример: консьюмер
import pika
import json
class OrderConsumer:
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
# Объявляем Exchange и Queue
self.channel.exchange_declare(
exchange='orders_exchange',
exchange_type='topic',
durable=True
)
self.channel.queue_declare(
queue='order_processing_queue',
durable=True
)
# Привязываем Queue к Exchange
self.channel.queue_bind(
exchange='orders_exchange',
queue='order_processing_queue',
routing_key='order.*'
)
# QoS — только 1 сообщение за раз
self.channel.basic_qos(prefetch_count=1)
def process_message(self, channel, method, properties, body):
try:
order = json.loads(body)
print(f"Processing order: {order}")
# Обработка
self.process_order(order)
# Acknowledge — подтверждаем обработку
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error: {e}")
# Reject — вернули сообщение в очередь
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True # Вернуть в очередь
)
def start(self):
self.channel.basic_consume(
queue='order_processing_queue',
on_message_callback=self.process_message
)
print("Waiting for messages...")
self.channel.start_consuming()
def process_order(self, order):
# Логика обработки заказа
print(f"Order {order['order_id']} processed")
# Использование
consumer = OrderConsumer()
consumer.start()
Важные концепции
1. Acknowledgement (Подтверждение)
# Auto-acknowledge — опасно (сообщение теряется при crash)
channel.basic_consume(
queue='queue',
on_message_callback=callback,
auto_ack=True # ❌ Плохо
)
# Manual acknowledge — правильно
def callback(ch, method, properties, body):
process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag) # ✅ Подтвердили
2. Prefetch (QoS)
# basic_qos(prefetch_count=1) — консьюмер берёт только 1 сообщение
channel.basic_qos(prefetch_count=1)
# Горизонтальное масштабирование: несколько консьюмеров получат работу
3. Dead Letter Queue (DLQ)
# Для сообщений, которые не удалось обработать
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dlq')
channel.queue_bind(exchange='dlx_exchange', queue='dlq')
# Основная очередь с привязкой к DLX
channel.queue_declare(
queue='main_queue',
arguments={'x-dead-letter-exchange': 'dlx_exchange'}
)
Архитектурное изображение
┌──────────────┐
│ Producer 1 │ publish(routing_key='order.created')
└──────┬───────┘
│
▼
┌─────────────────────────┐
│ orders_exchange │
│ (type: topic) │
└──────┬──────────────────┘
│
│ binding(routing_key='order.*')
│
├─────────────────────────┬──────────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌────────────────┐ ┌──────────────┐
│ Queue 1 │ │ Queue 2 │ │ Queue 3 │
│ (main) │ │ (analytics) │ │ (dlq) │
└──────┬───────┘ └────────┬───────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌────────────────┐
│ Consumer 1 │ │ Consumer 2 │
│ Consumer 2 │ │ Consumer 3 │
│ Consumer 3 │ └────────────────┘
└──────────────┘
(load balance)
Заключение
Между продюсером и консьюмером в RabbitMQ находятся: Exchange (маршрутизатор), Queue (буфер), и Binding (связь). Этот паттерн обеспечивает асинхронную, надёжную доставку сообщений с поддержкой масштабирования и обработки ошибок.