← Назад к вопросам

Что такое подписчики в RabbitMQ?

2.3 Middle🔥 111 комментариев
#Python Core#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Подписчики (Subscribers) в RabbitMQ

Подписчики — это потребители (consumers) сообщений, которые подключаются к RabbitMQ для получения и обработки сообщений из очередей. Они являются неотъемлемой частью архитектуры обмена сообщениями и работают в паре с издателями (publishers) — компонентами, отправляющими сообщения.

Основные концепции

В RabbitMQ используется модель pub-sub (издатель-подписчик):

  • Exchange (обменник) — точка входа для сообщений от издателей
  • Queue (очередь) — буфер, где хранятся сообщения
  • Binding (привязка) — связь между exchange и queue
  • Consumer (подписчик) — процесс, получающий сообщения из очереди

Как подписчик получает сообщения

import pika
import json

# Подключение к RabbitMQ
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Убеждаемся, что очередь существует
channel.queue_declare(queue='notifications', durable=True)

# Колбэк для обработки сообщения
def process_message(ch, method, properties, body):
    try:
        message = json.loads(body)
        print(f'Получено сообщение: {message}')
        # Обработка сообщения
        ch.basic_ack(delivery_tag=method.delivery_tag)  # Подтверждение
    except Exception as e:
        print(f'Ошибка: {e}')
        ch.basic_nack(delivery_tag=method.delivery_tag)  # Отклонение

# Подписываемся на очередь
channel.basic_consume(
    queue='notifications',
    on_message_callback=process_message
)

print('Подписчик ждёт сообщений...')
channel.start_consuming()

Архитектура с несколькими подписчиками

# publisher.py — издатель
import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Объявляем exchange
channel.exchange_declare(exchange='notifications', exchange_type='fanout')

# Отправляем сообщение
message = {'type': 'order_created', 'order_id': 123, 'amount': 99.99}
channel.basic_publish(
    exchange='notifications',
    routing_key='',
    body=json.dumps(message),
    properties=pika.BasicProperties(delivery_mode=2)  # Persistent
)

print('Сообщение отправлено')
connection.close()
# consumer.py — подписчик
import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Объявляем exchange и очередь
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
result = channel.queue_declare(queue='email_queue', exclusive=True)
queue_name = result.method.queue

# Привязываем очередь к exchange
channel.queue_bind(exchange='notifications', queue=queue_name)

def send_email(ch, method, properties, body):
    order = json.loads(body)
    print(f'Отправляем email для заказа {order["order_id"]}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue=queue_name, on_message_callback=send_email)
print('Email-подписчик запущен')
channel.start_consuming()

Типы маршрутизации (Exchange Types)

1. Fanout — все подписчики получают копию

# Exchange type: fanout
# Все подписчики получат ОДИНАКОВЫЕ сообщения

channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
# Несколько подписчиков могут получить одно сообщение

2. Direct — маршрутизация по ключу

# Exchange type: direct
# Только подписчики с matching routing_key получат сообщение

channel.exchange_declare(exchange='logs', exchange_type='direct')

# Издатель отправляет
channel.basic_publish(
    exchange='logs',
    routing_key='error',  # Ключ маршрутизации
    body=b'Critical error!'
)

# Подписчик слушает только 'error' сообщения
channel.queue_bind(exchange='logs', queue='error_queue', routing_key='error')

3. Topic — маршрутизация по шаблону

# Exchange type: topic
# Подписчики получают сообщения по шаблону routing_key

channel.exchange_declare(exchange='events', exchange_type='topic')

# Издатель
channel.basic_publish(
    exchange='events',
    routing_key='user.created.premium',  # Иерархический ключ
    body=b'New premium user'
)

# Подписчик 1: все события о пользователях
channel.queue_bind(
    exchange='events',
    queue='user_events',
    routing_key='user.*'  # Подстановочный символ
)

# Подписчик 2: только события создания
channel.queue_bind(
    exchange='events',
    queue='creation_events',
    routing_key='*.created.*'  # Любой тип создания
)

Управление подписчиками

QoS (Quality of Service) — количество одновременных сообщений:

# Подписчик обрабатывает по одному сообщению за раз
channel.basic_qos(prefetch_count=1)

def process(ch, method, properties, body):
    print(f'Обработка: {body}')
    time.sleep(2)  # Долгая операция
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='tasks', on_message_callback=process)
channel.start_consuming()

Множественные подписчики для масштабирования:

# Несколько процессов с QoS=1 распределяют нагрузку
# Если запустить 5 подписчиков, RabbitMQ распределит сообщения между ними

for i in range(5):
    # Запустить consumer.py 5 раз в разных процессах
    subprocess.Popen(['python', 'consumer.py'])

Подтверждение обработки (Acknowledgments)

def callback(ch, method, properties, body):
    try:
        # Обработка сообщения
        process_order(json.loads(body))
        # Успешное подтверждение
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Отклонение и повторная отправка
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True  # Вернуть сообщение в очередь
        )

# Auto-ack опасен (сообщение теряется при краше)
channel.basic_consume(
    queue='orders',
    on_message_callback=callback,
    auto_ack=False  # Явное подтверждение
)

Практический пример: микросервисная архитектура

# order_service.py — издатель
import pika
import json
import uuid

def publish_order_event(event_type, order_data):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    channel.exchange_declare(exchange='orders', exchange_type='topic')
    
    routing_key = f'order.{event_type}'  # order.created, order.cancelled
    
    channel.basic_publish(
        exchange='orders',
        routing_key=routing_key,
        body=json.dumps(order_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Persistent
            message_id=str(uuid.uuid4())
        )
    )
    connection.close()

# Использование
publish_order_event('created', {
    'order_id': 123,
    'customer_id': 456,
    'amount': 99.99
})
# payment_service.py — подписчик
import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.exchange_declare(exchange='orders', exchange_type='topic')
result = channel.queue_declare(queue='payment_queue')

# Слушаем новые заказы
channel.queue_bind(
    exchange='orders',
    queue='payment_queue',
    routing_key='order.created'
)

def process_payment(ch, method, properties, body):
    order = json.loads(body)
    print(f'Обработка платежа для заказа {order["order_id"]}')
    # Логика платежа
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='payment_queue',
    on_message_callback=process_payment
)
print('Payment-подписчик запущен')
channel.start_consuming()

Преимущества использования подписчиков

  • Асинхронность — издатель не ждёт обработки
  • Масштабируемость — добавьте подписчиков для распределения нагрузки
  • Развязка — сервисы не знают друг о друге
  • Надёжность — сообщения сохраняются в очередях
  • Множественная обработка — одно сообщение может быть обработано несколькими подписчиками

Вывод

Подписчики в RabbitMQ — это основной компонент асинхронной обработки сообщений в микросервисной архитектуре. Они позволяют строить масштабируемые и надёжные системы, где сервисы взаимодействуют через очереди, а не через синхронные вызовы API.