← Назад к вопросам

Как отправляются сообщения в RabbitMQ?

1.7 Middle🔥 81 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как отправляются сообщения в RabbitMQ?

RabbitMQ — это открытый message broker, реализующий протокол AMQP (Advanced Message Queuing Protocol). Отправка сообщений в RabbitMQ включает несколько ключевых этапов и концепций.

1. Основная архитектура

RabbitMQ использует модель Publisher → Exchange → Queue → Consumer:

  • Publisher — отправляет сообщение
  • Exchange — получает сообщение и маршрутизирует его в очереди
  • Queue — хранит сообщения до обработки
  • Consumer — читает сообщения из очереди

2. Подключение и отправка сообщения

import pika
import json

# Подключение к RabbitMQ
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()

# Объявление exchange
channel.exchange_declare(
    exchange="my_exchange",
    exchange_type="direct",  # direct, fanout, topic, headers
    durable=True
)

# Объявление очереди
channel.queue_declare(
    queue="my_queue",
    durable=True
)

# Привязка очереди к exchange с routing key
channel.queue_bind(
    exchange="my_exchange",
    queue="my_queue",
    routing_key="my_key"
)

# Отправка сообщения
message = {"user_id": 123, "action": "login"}
channel.basic_publish(
    exchange="my_exchange",
    routing_key="my_key",
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2  # Делает сообщение persistent
    )
)

connection.close()

3. Типы Exchange

Маршрутизация зависит от типа exchange:

Direct Exchange

Маршрутизирует сообщение в очередь, если routing_key совпадает:

channel.exchange_declare(
    exchange="tasks",
    exchange_type="direct",
    durable=True
)

# Одна очередь для critical
channel.queue_declare(queue="critical_queue")
channel.queue_bind(
    exchange="tasks",
    queue="critical_queue",
    routing_key="critical"
)

# Другая очередь для normal
channel.queue_declare(queue="normal_queue")
channel.queue_bind(
    exchange="tasks",
    queue="normal_queue",
    routing_key="normal"
)

# Отправка
channel.basic_publish(
    exchange="tasks",
    routing_key="critical",  # Попадёт в critical_queue
    body=b"Critical task"
)

Fanout Exchange

Отправляет сообщение во все привязанные очереди, игнорируя routing_key:

channel.exchange_declare(
    exchange="notifications",
    exchange_type="fanout",
    durable=True
)

# Создаём несколько очередей
for queue_name in ["email_queue", "sms_queue", "push_queue"]:
    channel.queue_declare(queue=queue_name)
    channel.queue_bind(
        exchange="notifications",
        queue=queue_name
    )

# Сообщение попадёт во все три очереди
channel.basic_publish(
    exchange="notifications",
    routing_key="",
    body=b"User registered"
)

Topic Exchange

Маршрутизирует по pattern в routing_key (с подстановочными символами):

channel.exchange_declare(
    exchange="logs",
    exchange_type="topic",
    durable=True
)

# Очередь для всех ошибок
channel.queue_declare(queue="all_errors")
channel.queue_bind(
    exchange="logs",
    queue="all_errors",
    routing_key="*.error"  # Совпадает: auth.error, payment.error
)

# Очередь только для authentication
channel.queue_declare(queue="auth_logs")
channel.queue_bind(
    exchange="logs",
    queue="auth_logs",
    routing_key="auth.*"  # Совпадает: auth.error, auth.info
)

# Отправка
channel.basic_publish(
    exchange="logs",
    routing_key="auth.error",
    body=b"Authentication failed"
)

4. Persistence и reliability

# Persistent сообщения (выживают перезагрузку сервера)
channel.basic_publish(
    exchange="my_exchange",
    routing_key="my_key",
    body=b"Important message",
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,
        content_type="application/json"
    )
)

# Подтверждение доставки (acknowledgment)
def callback(ch, method, properties, body):
    try:
        # Обработка сообщения
        process_message(body)
        # Подтверждение успешной обработки
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Отклонение и повторная отправка в очередь
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(
    queue="my_queue",
    on_message_callback=callback,
    auto_ack=False  # Требует ручного подтверждения
)

5. Пример полного workflow

import pika
import json
import time

class RabbitMQPublisher:
    def __init__(self, host="localhost"):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
    
    def send_message(self, exchange, routing_key, message, persistent=True):
        delivery_mode = 2 if persistent else 1
        
        self.channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=delivery_mode,
                content_type="application/json"
            )
        )
        print(f"Sent: {message}")
    
    def close(self):
        self.connection.close()

class RabbitMQConsumer:
    def __init__(self, host="localhost"):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
    
    def consume_messages(self, queue, callback):
        self.channel.basic_consume(
            queue=queue,
            on_message_callback=callback,
            auto_ack=False
        )
        print(f"Waiting for messages in {queue}...")
        self.channel.start_consuming()

# Использование
if __name__ == "__main__":
    # Отправка
    publisher = RabbitMQPublisher()
    publisher.send_message(
        exchange="tasks",
        routing_key="process",
        message={"task_id": 1, "data": "process this"}
    )
    publisher.close()
    
    # Приём
    def process_callback(ch, method, properties, body):
        message = json.loads(body)
        print(f"Processing: {message}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    consumer = RabbitMQConsumer()
    consumer.consume_messages("my_queue", process_callback)

6. Ключевые параметры отправки

ПараметрОписание
exchangeИмя exchange для маршрутизации
routing_keyКлюч маршрутизации (зависит от типа exchange)
bodyТело сообщения (bytes)
delivery_mode1 = transient, 2 = persistent
content_typeТип содержимого (обычно application/json)
priorityПриоритет сообщения (0-10)
expirationTTL сообщения в миллисекундах

Итог: отправка в RabbitMQ включает создание соединения, объявление exchange и queues, привязку маршрутов и публикацию сообщения с нужными свойствами для гарантии доставки и обработки.

Как отправляются сообщения в RabbitMQ? | PrepBro