Что такое подписчики в RabbitMQ?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Подписчики (Subscribers) в RabbitMQ
Подписчики — это потребители (consumers) сообщений, которые подключаются к RabbitMQ для получения и обработки сообщений из очередей. Они являются неотъемлемой частью архитектуры обмена сообщениями и работают в паре с издателями (publishers) — компонентами, отправляющими сообщения.
Основные концепции
В RabbitMQ используется модель pub-sub (издатель-подписчик):
- Exchange (обменник) — точка входа для сообщений от издателей
- Queue (очередь) — буфер, где хранятся сообщения
- Binding (привязка) — связь между exchange и queue
- Consumer (подписчик) — процесс, получающий сообщения из очереди
Как подписчик получает сообщения
import pika
import json
# Подключение к RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Убеждаемся, что очередь существует
channel.queue_declare(queue='notifications', durable=True)
# Колбэк для обработки сообщения
def process_message(ch, method, properties, body):
try:
message = json.loads(body)
print(f'Получено сообщение: {message}')
# Обработка сообщения
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтверждение
except Exception as e:
print(f'Ошибка: {e}')
ch.basic_nack(delivery_tag=method.delivery_tag) # Отклонение
# Подписываемся на очередь
channel.basic_consume(
queue='notifications',
on_message_callback=process_message
)
print('Подписчик ждёт сообщений...')
channel.start_consuming()
Архитектура с несколькими подписчиками
# publisher.py — издатель
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Объявляем exchange
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
# Отправляем сообщение
message = {'type': 'order_created', 'order_id': 123, 'amount': 99.99}
channel.basic_publish(
exchange='notifications',
routing_key='',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Persistent
)
print('Сообщение отправлено')
connection.close()
# consumer.py — подписчик
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Объявляем exchange и очередь
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
result = channel.queue_declare(queue='email_queue', exclusive=True)
queue_name = result.method.queue
# Привязываем очередь к exchange
channel.queue_bind(exchange='notifications', queue=queue_name)
def send_email(ch, method, properties, body):
order = json.loads(body)
print(f'Отправляем email для заказа {order["order_id"]}')
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=send_email)
print('Email-подписчик запущен')
channel.start_consuming()
Типы маршрутизации (Exchange Types)
1. Fanout — все подписчики получают копию
# Exchange type: fanout
# Все подписчики получат ОДИНАКОВЫЕ сообщения
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
# Несколько подписчиков могут получить одно сообщение
2. Direct — маршрутизация по ключу
# Exchange type: direct
# Только подписчики с matching routing_key получат сообщение
channel.exchange_declare(exchange='logs', exchange_type='direct')
# Издатель отправляет
channel.basic_publish(
exchange='logs',
routing_key='error', # Ключ маршрутизации
body=b'Critical error!'
)
# Подписчик слушает только 'error' сообщения
channel.queue_bind(exchange='logs', queue='error_queue', routing_key='error')
3. Topic — маршрутизация по шаблону
# Exchange type: topic
# Подписчики получают сообщения по шаблону routing_key
channel.exchange_declare(exchange='events', exchange_type='topic')
# Издатель
channel.basic_publish(
exchange='events',
routing_key='user.created.premium', # Иерархический ключ
body=b'New premium user'
)
# Подписчик 1: все события о пользователях
channel.queue_bind(
exchange='events',
queue='user_events',
routing_key='user.*' # Подстановочный символ
)
# Подписчик 2: только события создания
channel.queue_bind(
exchange='events',
queue='creation_events',
routing_key='*.created.*' # Любой тип создания
)
Управление подписчиками
QoS (Quality of Service) — количество одновременных сообщений:
# Подписчик обрабатывает по одному сообщению за раз
channel.basic_qos(prefetch_count=1)
def process(ch, method, properties, body):
print(f'Обработка: {body}')
time.sleep(2) # Долгая операция
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='tasks', on_message_callback=process)
channel.start_consuming()
Множественные подписчики для масштабирования:
# Несколько процессов с QoS=1 распределяют нагрузку
# Если запустить 5 подписчиков, RabbitMQ распределит сообщения между ними
for i in range(5):
# Запустить consumer.py 5 раз в разных процессах
subprocess.Popen(['python', 'consumer.py'])
Подтверждение обработки (Acknowledgments)
def callback(ch, method, properties, body):
try:
# Обработка сообщения
process_order(json.loads(body))
# Успешное подтверждение
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Отклонение и повторная отправка
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True # Вернуть сообщение в очередь
)
# Auto-ack опасен (сообщение теряется при краше)
channel.basic_consume(
queue='orders',
on_message_callback=callback,
auto_ack=False # Явное подтверждение
)
Практический пример: микросервисная архитектура
# order_service.py — издатель
import pika
import json
import uuid
def publish_order_event(event_type, order_data):
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='topic')
routing_key = f'order.{event_type}' # order.created, order.cancelled
channel.basic_publish(
exchange='orders',
routing_key=routing_key,
body=json.dumps(order_data),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
message_id=str(uuid.uuid4())
)
)
connection.close()
# Использование
publish_order_event('created', {
'order_id': 123,
'customer_id': 456,
'amount': 99.99
})
# payment_service.py — подписчик
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='topic')
result = channel.queue_declare(queue='payment_queue')
# Слушаем новые заказы
channel.queue_bind(
exchange='orders',
queue='payment_queue',
routing_key='order.created'
)
def process_payment(ch, method, properties, body):
order = json.loads(body)
print(f'Обработка платежа для заказа {order["order_id"]}')
# Логика платежа
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='payment_queue',
on_message_callback=process_payment
)
print('Payment-подписчик запущен')
channel.start_consuming()
Преимущества использования подписчиков
- Асинхронность — издатель не ждёт обработки
- Масштабируемость — добавьте подписчиков для распределения нагрузки
- Развязка — сервисы не знают друг о друге
- Надёжность — сообщения сохраняются в очередях
- Множественная обработка — одно сообщение может быть обработано несколькими подписчиками
Вывод
Подписчики в RabbitMQ — это основной компонент асинхронной обработки сообщений в микросервисной архитектуре. Они позволяют строить масштабируемые и надёжные системы, где сервисы взаимодействуют через очереди, а не через синхронные вызовы API.