Какие знаешь важные роли у RabbitMQ?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Важные роли и концепции RabbitMQ
RabbitMQ — это message broker на основе AMQP. Он использует уникальную модель с роллями сообщений.
1. Producer (Отправитель)
Процесс, который создаёт и отправляет сообщения:
import pika
# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')
# Отправляем сообщение
channel.basic_publish(
exchange='my_exchange',
routing_key='user.created',
body='User John created'
)
connection.close()
Характеристики:
- Не зависит от Consumer
- Может отправлять без Consumer
- Может быть несколько Producers
2. Consumer (Получатель)
Процесс, который получает и обрабатывает сообщения:
import pika
def callback(ch, method, properties, body):
print(f"Received message: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем queue
channel.queue_declare(queue='user_events')
# Привязываем queue к exchange
channel.queue_bind(
exchange='my_exchange',
queue='user_events',
routing_key='user.created'
)
# Слушаем сообщения
channel.basic_consume(
queue='user_events',
on_message_callback=callback
)
print('Waiting for messages...')
channel.start_consuming()
Характеристики:
- Может работать без Producer (очередь сохранит сообщения)
- Может быть несколько Consumers для одной queue
- Acknowledgement гарантирует обработку
3. Exchange (Маршрутизатор)
Обмен принимает сообщения от Producer и маршрутизирует к Queues:
a) Direct Exchange
# Exchange маршрутизирует по точному routing_key
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# Producer
channel.basic_publish(
exchange='direct_logs',
routing_key='error', # Точное совпадение
body='Error occurred'
)
# Consumers
channel.queue_bind(
exchange='direct_logs',
queue='error_queue',
routing_key='error'
)
b) Fanout Exchange
# Рассылает сообщение ВСЕ queues
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Producer
channel.basic_publish(
exchange='logs',
routing_key='', # Игнорируется
body='Broadcast message'
)
# Все queues получат сообщение
channel.queue_bind(exchange='logs', queue='queue1')
channel.queue_bind(exchange='logs', queue='queue2')
c) Topic Exchange
# Маршрутизирует по pattern
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# Producer
channel.basic_publish(
exchange='topic_logs',
routing_key='user.created.verified',
body='User created and verified'
)
# Consumer 1: слушает всех пользователей
channel.queue_bind(
exchange='topic_logs',
queue='user_queue',
routing_key='user.*'
)
# Consumer 2: слушает только verified
channel.queue_bind(
exchange='topic_logs',
queue='verified_queue',
routing_key='user.*.verified'
)
4. Queue (Очередь)
Хранилище сообщений, связанная с одним Consumer:
# Объявляем queue с параметрами
channel.queue_declare(
queue='task_queue',
durable=True, # Сохраняется при перезагрузке RabbitMQ
exclusive=False, # Может использовать несколько Consumers
auto_delete=False # Не удаляется при отключении Consumer
)
Типы:
- Durable Queue — сохраняет сообщения
- Exclusive Queue — используется одним Consumer
- Auto-delete — удаляется при отключении Consumer
5. Binding (Привязка)
Связывает Queue с Exchange через routing_key:
channel.queue_bind(
exchange='my_exchange',
queue='my_queue',
routing_key='events.user.*'
)
Одна Queue может быть привязана к нескольким Exchanges!
6. Routing Key
Строка, которая используется для маршрутизации:
# Direct: точное совпадение
routing_key = 'error'
# Topic: pattern matching
routing_key = 'user.created.verified' # Можно использовать *, #
routing_key_pattern = 'user.*.verified' # * = одно слово
routing_key_pattern = 'user.#' # # = любое количество слов
Полный пример: Email уведомления
# email_producer.py — создание заказа
import pika
import json
class OrderProducer:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True
)
def publish_order(self, order_id, email):
message = json.dumps({
'order_id': order_id,
'email': email,
'timestamp': '2024-01-15'
})
self.channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
# email_consumer.py — отправка email
import pika
import json
class EmailConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True
)
# Объявляем queue
result = self.channel.queue_declare(
queue='email_queue',
durable=True
)
# Привязываем к exchange
self.channel.queue_bind(
exchange='orders',
queue='email_queue',
routing_key='order.#'
)
def start(self):
def callback(ch, method, properties, body):
data = json.loads(body)
self.send_email(data['email'], data['order_id'])
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_qos(prefetch_count=1) # One task per consumer
self.channel.basic_consume(
queue='email_queue',
on_message_callback=callback
)
self.channel.start_consuming()
def send_email(self, email, order_id):
print(f'Sending email to {email} for order {order_id}')
Важные параметры
Delivery Mode
pika.BasicProperties(
delivery_mode=1, # Transient (теряется при перезагрузке)
delivery_mode=2, # Persistent (сохраняется)
)
Acknowledgement
# Auto ack (опасно)
channel.basic_consume(queue='q', auto_ack=True)
# Manual ack (безопасно)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Negative ack (вернуть в очередь)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
QoS (Quality of Service)
# Ограничиваем количество необработанных сообщений
channel.basic_qos(prefetch_count=1) # Одно сообщение за раз
Dead Letter Queue (DLQ)
Для обработки сообщений, которые не удалось обработать:
# Основная queue с DLX (Dead Letter Exchange)
channel.queue_declare(
queue='main_queue',
arguments={
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dead_letter'
}
)
# DLX exchange
channel.exchange_declare(
exchange='dlx_exchange',
exchange_type='direct'
)
# DLQ
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(
exchange='dlx_exchange',
queue='dead_letter_queue',
routing_key='dead_letter'
)
Best Practices
1. Используй Durable — не теряй сообщения
2. Используй Manual Ack — гарантируй обработку
3. Устанавливай QoS — не перегружай Worker
4. Используй DLQ — ловите ошибки
5. Версионируй сообщения — добавляй version в JSON
6. Обрабатывай exception — не давай Task умирать
try:
# Обработка
process_message(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Failed: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
RabbitMQ — мощный инструмент для асинхронной коммуникации между микросервисами.