← Назад к вопросам

Какие знаешь важные роли у RabbitMQ?

1.7 Middle🔥 131 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Важные роли и концепции RabbitMQ

RabbitMQ — это message broker на основе AMQP. Он использует уникальную модель с роллями сообщений.

1. Producer (Отправитель)

Процесс, который создаёт и отправляет сообщения:

import pika

# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')

# Отправляем сообщение
channel.basic_publish(
    exchange='my_exchange',
    routing_key='user.created',
    body='User John created'
)
connection.close()

Характеристики:

  • Не зависит от Consumer
  • Может отправлять без Consumer
  • Может быть несколько Producers

2. Consumer (Получатель)

Процесс, который получает и обрабатывает сообщения:

import pika

def callback(ch, method, properties, body):
    print(f"Received message: {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем queue
channel.queue_declare(queue='user_events')

# Привязываем queue к exchange
channel.queue_bind(
    exchange='my_exchange',
    queue='user_events',
    routing_key='user.created'
)

# Слушаем сообщения
channel.basic_consume(
    queue='user_events',
    on_message_callback=callback
)

print('Waiting for messages...')
channel.start_consuming()

Характеристики:

  • Может работать без Producer (очередь сохранит сообщения)
  • Может быть несколько Consumers для одной queue
  • Acknowledgement гарантирует обработку

3. Exchange (Маршрутизатор)

Обмен принимает сообщения от Producer и маршрутизирует к Queues:

a) Direct Exchange

# Exchange маршрутизирует по точному routing_key
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Producer
channel.basic_publish(
    exchange='direct_logs',
    routing_key='error',  # Точное совпадение
    body='Error occurred'
)

# Consumers
channel.queue_bind(
    exchange='direct_logs',
    queue='error_queue',
    routing_key='error'
)

b) Fanout Exchange

# Рассылает сообщение ВСЕ queues
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# Producer
channel.basic_publish(
    exchange='logs',
    routing_key='',  # Игнорируется
    body='Broadcast message'
)

# Все queues получат сообщение
channel.queue_bind(exchange='logs', queue='queue1')
channel.queue_bind(exchange='logs', queue='queue2')

c) Topic Exchange

# Маршрутизирует по pattern
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# Producer
channel.basic_publish(
    exchange='topic_logs',
    routing_key='user.created.verified',
    body='User created and verified'
)

# Consumer 1: слушает всех пользователей
channel.queue_bind(
    exchange='topic_logs',
    queue='user_queue',
    routing_key='user.*'
)

# Consumer 2: слушает только verified
channel.queue_bind(
    exchange='topic_logs',
    queue='verified_queue',
    routing_key='user.*.verified'
)

4. Queue (Очередь)

Хранилище сообщений, связанная с одним Consumer:

# Объявляем queue с параметрами
channel.queue_declare(
    queue='task_queue',
    durable=True,  # Сохраняется при перезагрузке RabbitMQ
    exclusive=False,  # Может использовать несколько Consumers
    auto_delete=False  # Не удаляется при отключении Consumer
)

Типы:

  • Durable Queue — сохраняет сообщения
  • Exclusive Queue — используется одним Consumer
  • Auto-delete — удаляется при отключении Consumer

5. Binding (Привязка)

Связывает Queue с Exchange через routing_key:

channel.queue_bind(
    exchange='my_exchange',
    queue='my_queue',
    routing_key='events.user.*'
)

Одна Queue может быть привязана к нескольким Exchanges!

6. Routing Key

Строка, которая используется для маршрутизации:

# Direct: точное совпадение
routing_key = 'error'

# Topic: pattern matching
routing_key = 'user.created.verified'  # Можно использовать *, #
routing_key_pattern = 'user.*.verified'  # * = одно слово
routing_key_pattern = 'user.#'  # # = любое количество слов

Полный пример: Email уведомления

# email_producer.py — создание заказа
import pika
import json

class OrderProducer:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(
            exchange='orders',
            exchange_type='topic',
            durable=True
        )
    
    def publish_order(self, order_id, email):
        message = json.dumps({
            'order_id': order_id,
            'email': email,
            'timestamp': '2024-01-15'
        })
        self.channel.basic_publish(
            exchange='orders',
            routing_key='order.created',
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
            )
        )

# email_consumer.py — отправка email
import pika
import json

class EmailConsumer:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        
        self.channel.exchange_declare(
            exchange='orders',
            exchange_type='topic',
            durable=True
        )
        
        # Объявляем queue
        result = self.channel.queue_declare(
            queue='email_queue',
            durable=True
        )
        
        # Привязываем к exchange
        self.channel.queue_bind(
            exchange='orders',
            queue='email_queue',
            routing_key='order.#'
        )
    
    def start(self):
        def callback(ch, method, properties, body):
            data = json.loads(body)
            self.send_email(data['email'], data['order_id'])
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
        self.channel.basic_qos(prefetch_count=1)  # One task per consumer
        self.channel.basic_consume(
            queue='email_queue',
            on_message_callback=callback
        )
        self.channel.start_consuming()
    
    def send_email(self, email, order_id):
        print(f'Sending email to {email} for order {order_id}')

Важные параметры

Delivery Mode

pika.BasicProperties(
    delivery_mode=1,  # Transient (теряется при перезагрузке)
    delivery_mode=2,  # Persistent (сохраняется)
)

Acknowledgement

# Auto ack (опасно)
channel.basic_consume(queue='q', auto_ack=True)

# Manual ack (безопасно)
ch.basic_ack(delivery_tag=method.delivery_tag)

# Negative ack (вернуть в очередь)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

QoS (Quality of Service)

# Ограничиваем количество необработанных сообщений
channel.basic_qos(prefetch_count=1)  # Одно сообщение за раз

Dead Letter Queue (DLQ)

Для обработки сообщений, которые не удалось обработать:

# Основная queue с DLX (Dead Letter Exchange)
channel.queue_declare(
    queue='main_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',
        'x-dead-letter-routing-key': 'dead_letter'
    }
)

# DLX exchange
channel.exchange_declare(
    exchange='dlx_exchange',
    exchange_type='direct'
)

# DLQ
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(
    exchange='dlx_exchange',
    queue='dead_letter_queue',
    routing_key='dead_letter'
)

Best Practices

1. Используй Durable — не теряй сообщения

2. Используй Manual Ack — гарантируй обработку

3. Устанавливай QoS — не перегружай Worker

4. Используй DLQ — ловите ошибки

5. Версионируй сообщения — добавляй version в JSON

6. Обрабатывай exception — не давай Task умирать

try:
    # Обработка
    process_message(data)
    ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
    logger.error(f"Failed: {e}")
    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

RabbitMQ — мощный инструмент для асинхронной коммуникации между микросервисами.

Какие знаешь важные роли у RabbitMQ? | PrepBro