← Назад к вопросам
Как отправляются сообщения в RabbitMQ?
1.7 Middle🔥 81 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как отправляются сообщения в RabbitMQ?
RabbitMQ — это открытый message broker, реализующий протокол AMQP (Advanced Message Queuing Protocol). Отправка сообщений в RabbitMQ включает несколько ключевых этапов и концепций.
1. Основная архитектура
RabbitMQ использует модель Publisher → Exchange → Queue → Consumer:
- Publisher — отправляет сообщение
- Exchange — получает сообщение и маршрутизирует его в очереди
- Queue — хранит сообщения до обработки
- Consumer — читает сообщения из очереди
2. Подключение и отправка сообщения
import pika
import json
# Подключение к RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
# Объявление exchange
channel.exchange_declare(
exchange="my_exchange",
exchange_type="direct", # direct, fanout, topic, headers
durable=True
)
# Объявление очереди
channel.queue_declare(
queue="my_queue",
durable=True
)
# Привязка очереди к exchange с routing key
channel.queue_bind(
exchange="my_exchange",
queue="my_queue",
routing_key="my_key"
)
# Отправка сообщения
message = {"user_id": 123, "action": "login"}
channel.basic_publish(
exchange="my_exchange",
routing_key="my_key",
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2 # Делает сообщение persistent
)
)
connection.close()
3. Типы Exchange
Маршрутизация зависит от типа exchange:
Direct Exchange
Маршрутизирует сообщение в очередь, если routing_key совпадает:
channel.exchange_declare(
exchange="tasks",
exchange_type="direct",
durable=True
)
# Одна очередь для critical
channel.queue_declare(queue="critical_queue")
channel.queue_bind(
exchange="tasks",
queue="critical_queue",
routing_key="critical"
)
# Другая очередь для normal
channel.queue_declare(queue="normal_queue")
channel.queue_bind(
exchange="tasks",
queue="normal_queue",
routing_key="normal"
)
# Отправка
channel.basic_publish(
exchange="tasks",
routing_key="critical", # Попадёт в critical_queue
body=b"Critical task"
)
Fanout Exchange
Отправляет сообщение во все привязанные очереди, игнорируя routing_key:
channel.exchange_declare(
exchange="notifications",
exchange_type="fanout",
durable=True
)
# Создаём несколько очередей
for queue_name in ["email_queue", "sms_queue", "push_queue"]:
channel.queue_declare(queue=queue_name)
channel.queue_bind(
exchange="notifications",
queue=queue_name
)
# Сообщение попадёт во все три очереди
channel.basic_publish(
exchange="notifications",
routing_key="",
body=b"User registered"
)
Topic Exchange
Маршрутизирует по pattern в routing_key (с подстановочными символами):
channel.exchange_declare(
exchange="logs",
exchange_type="topic",
durable=True
)
# Очередь для всех ошибок
channel.queue_declare(queue="all_errors")
channel.queue_bind(
exchange="logs",
queue="all_errors",
routing_key="*.error" # Совпадает: auth.error, payment.error
)
# Очередь только для authentication
channel.queue_declare(queue="auth_logs")
channel.queue_bind(
exchange="logs",
queue="auth_logs",
routing_key="auth.*" # Совпадает: auth.error, auth.info
)
# Отправка
channel.basic_publish(
exchange="logs",
routing_key="auth.error",
body=b"Authentication failed"
)
4. Persistence и reliability
# Persistent сообщения (выживают перезагрузку сервера)
channel.basic_publish(
exchange="my_exchange",
routing_key="my_key",
body=b"Important message",
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,
content_type="application/json"
)
)
# Подтверждение доставки (acknowledgment)
def callback(ch, method, properties, body):
try:
# Обработка сообщения
process_message(body)
# Подтверждение успешной обработки
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Отклонение и повторная отправка в очередь
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(
queue="my_queue",
on_message_callback=callback,
auto_ack=False # Требует ручного подтверждения
)
5. Пример полного workflow
import pika
import json
import time
class RabbitMQPublisher:
def __init__(self, host="localhost"):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
def send_message(self, exchange, routing_key, message, persistent=True):
delivery_mode = 2 if persistent else 1
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=delivery_mode,
content_type="application/json"
)
)
print(f"Sent: {message}")
def close(self):
self.connection.close()
class RabbitMQConsumer:
def __init__(self, host="localhost"):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
def consume_messages(self, queue, callback):
self.channel.basic_consume(
queue=queue,
on_message_callback=callback,
auto_ack=False
)
print(f"Waiting for messages in {queue}...")
self.channel.start_consuming()
# Использование
if __name__ == "__main__":
# Отправка
publisher = RabbitMQPublisher()
publisher.send_message(
exchange="tasks",
routing_key="process",
message={"task_id": 1, "data": "process this"}
)
publisher.close()
# Приём
def process_callback(ch, method, properties, body):
message = json.loads(body)
print(f"Processing: {message}")
ch.basic_ack(delivery_tag=method.delivery_tag)
consumer = RabbitMQConsumer()
consumer.consume_messages("my_queue", process_callback)
6. Ключевые параметры отправки
| Параметр | Описание |
|---|---|
| exchange | Имя exchange для маршрутизации |
| routing_key | Ключ маршрутизации (зависит от типа exchange) |
| body | Тело сообщения (bytes) |
| delivery_mode | 1 = transient, 2 = persistent |
| content_type | Тип содержимого (обычно application/json) |
| priority | Приоритет сообщения (0-10) |
| expiration | TTL сообщения в миллисекундах |
Итог: отправка в RabbitMQ включает создание соединения, объявление exchange и queues, привязку маршрутов и публикацию сообщения с нужными свойствами для гарантии доставки и обработки.