← Назад к вопросам

Что в RabbitMQ между продюсером и консьюмером?

2.7 Senior🔥 121 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

RabbitMQ: архитектура между продюсером и консьюмером

RabbitMQ — это message broker, система очередей, которая передаёт сообщения между продюсером (отправитель) и консьюмером (получатель). Это критическая часть асинхронной архитектуры. Объясню все компоненты.

Основная архитектура

Продюсер → Exchange → Queue → Консьюмер
   ↓          ↓        ↓        ↓
 приложение  маршрут  буфер   другое приложение

Компонент 1: Exchange (Обменник)

Exchange — это маршрутизатор сообщений. Он решает, какую очередь выбрать.

from pika import BlockingConnection, ConnectionParameters
import pika

# Продюсер публикует сообщение в Exchange
connection = BlockingConnection(ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем Exchange
channel.exchange_declare(
    exchange='orders_exchange',
    exchange_type='direct',  # или 'topic', 'fanout', 'headers'
    durable=True  # Сохранится при перезагрузке
)

# Публикуем сообщение
channel.basic_publish(
    exchange='orders_exchange',
    routing_key='order.created',  # ← Это ключ маршрутизации
    body=b'{"order_id": 123, "user_id": 456}',
    properties=pika.BasicProperties(
        content_type='application/json',
        delivery_mode=2  # persistent — не потеряется при crash
    )
)

Типы Exchange

# 1. Direct Exchange — точное совпадение
channel.exchange_declare(exchange='direct_ex', exchange_type='direct')
# routing_key='user.created' → попадёт в очередь с binding_key='user.created'
# routing_key='user.deleted' → не попадёт в очередь с binding_key='user.created'

# 2. Topic Exchange — с wildcard
channel.exchange_declare(exchange='topic_ex', exchange_type='topic')
# routing_key='user.created' → совпадает с 'user.*' и 'user.#'
# routing_key='order.payment.success' → совпадает с 'order.*.success' и 'order.#'

# 3. Fanout Exchange — broadcast всем очередям
channel.exchange_declare(exchange='fanout_ex', exchange_type='fanout')
# Игнорирует routing_key, просто отправляет всем очередям

# 4. Headers Exchange — по заголовкам сообщения
channel.exchange_declare(exchange='headers_ex', exchange_type='headers')
# Маршрутизация по properties сообщения

Компонент 2: Queue (Очередь)

Очередь — это буфер сообщений в памяти/на диске.

# Консьюмер создаёт очередь
channel.queue_declare(
    queue='order_processing_queue',
    durable=True,  # Сохранится при перезагрузке
    exclusive=False,  # Может быть использована несколькими консьюмерами
    auto_delete=False,  # Не удаляется, когда нет консьюмеров
    arguments={
        'x-message-ttl': 3600000,  # 1 час — TTL сообщения
        'x-max-length': 10000,  # Max 10000 сообщений в очереди
        'x-dead-letter-exchange': 'dlx_exchange',  # DLX для ошибок
    }
)

Компонент 3: Binding (Привязка)

Binding связывает Exchange с Queue.

# Привязка: Exchange → Queue с ключом маршрутизации
channel.queue_bind(
    exchange='orders_exchange',
    queue='order_processing_queue',
    routing_key='order.*'  # Очередь получит сообщения с этим паттерном
)

# Можно привязать несколько очередей к одному Exchange
channel.queue_bind(
    exchange='orders_exchange',
    queue='order_analytics_queue',
    routing_key='order.*'
)

# Теперь оба консьюмера получат сообщение

Полный пример: продюсер

import pika
import json

class OrderProducer:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()
        
        # Объявляем Exchange
        self.channel.exchange_declare(
            exchange='orders_exchange',
            exchange_type='topic',
            durable=True
        )
    
    def publish_order(self, order_data):
        message = json.dumps(order_data)
        
        self.channel.basic_publish(
            exchange='orders_exchange',
            routing_key='order.created',
            body=message,
            properties=pika.BasicProperties(
                content_type='application/json',
                delivery_mode=2,  # Persistent
                expiration='3600000'  # 1 час
            )
        )
        print(f"Published: {message}")
    
    def close(self):
        self.connection.close()

# Использование
producer = OrderProducer()
producer.publish_order({
    'order_id': 123,
    'user_id': 456,
    'total': 99.99
})
producer.close()

Полный пример: консьюмер

import pika
import json

class OrderConsumer:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()
        
        # Объявляем Exchange и Queue
        self.channel.exchange_declare(
            exchange='orders_exchange',
            exchange_type='topic',
            durable=True
        )
        
        self.channel.queue_declare(
            queue='order_processing_queue',
            durable=True
        )
        
        # Привязываем Queue к Exchange
        self.channel.queue_bind(
            exchange='orders_exchange',
            queue='order_processing_queue',
            routing_key='order.*'
        )
        
        # QoS — только 1 сообщение за раз
        self.channel.basic_qos(prefetch_count=1)
    
    def process_message(self, channel, method, properties, body):
        try:
            order = json.loads(body)
            print(f"Processing order: {order}")
            
            # Обработка
            self.process_order(order)
            
            # Acknowledge — подтверждаем обработку
            channel.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"Error: {e}")
            # Reject — вернули сообщение в очередь
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=True  # Вернуть в очередь
            )
    
    def start(self):
        self.channel.basic_consume(
            queue='order_processing_queue',
            on_message_callback=self.process_message
        )
        print("Waiting for messages...")
        self.channel.start_consuming()
    
    def process_order(self, order):
        # Логика обработки заказа
        print(f"Order {order['order_id']} processed")

# Использование
consumer = OrderConsumer()
consumer.start()

Важные концепции

1. Acknowledgement (Подтверждение)

# Auto-acknowledge — опасно (сообщение теряется при crash)
channel.basic_consume(
    queue='queue',
    on_message_callback=callback,
    auto_ack=True  # ❌ Плохо
)

# Manual acknowledge — правильно
def callback(ch, method, properties, body):
    process_message(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # ✅ Подтвердили

2. Prefetch (QoS)

# basic_qos(prefetch_count=1) — консьюмер берёт только 1 сообщение
channel.basic_qos(prefetch_count=1)
# Горизонтальное масштабирование: несколько консьюмеров получат работу

3. Dead Letter Queue (DLQ)

# Для сообщений, которые не удалось обработать
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dlq')
channel.queue_bind(exchange='dlx_exchange', queue='dlq')

# Основная очередь с привязкой к DLX
channel.queue_declare(
    queue='main_queue',
    arguments={'x-dead-letter-exchange': 'dlx_exchange'}
)

Архитектурное изображение

┌──────────────┐
│  Producer 1  │ publish(routing_key='order.created')
└──────┬───────┘
       │
       ▼
┌─────────────────────────┐
│   orders_exchange       │
│   (type: topic)         │
└──────┬──────────────────┘
       │
       │ binding(routing_key='order.*')
       │
       ├─────────────────────────┬──────────────────────────┐
       ▼                         ▼                          ▼
  ┌──────────────┐          ┌────────────────┐       ┌──────────────┐
  │   Queue 1    │          │   Queue 2      │       │   Queue 3    │
  │   (main)     │          │   (analytics)  │       │   (dlq)      │
  └──────┬───────┘          └────────┬───────┘       └──────────────┘
         │                           │
         ▼                           ▼
  ┌──────────────┐          ┌────────────────┐
  │ Consumer 1   │          │  Consumer 2    │
  │ Consumer 2   │          │  Consumer 3    │
  │ Consumer 3   │          └────────────────┘
  └──────────────┘
  (load balance)

Заключение

Между продюсером и консьюмером в RabbitMQ находятся: Exchange (маршрутизатор), Queue (буфер), и Binding (связь). Этот паттерн обеспечивает асинхронную, надёжную доставку сообщений с поддержкой масштабирования и обработки ошибок.

Что в RabbitMQ между продюсером и консьюмером? | PrepBro