← Назад к вопросам

Какие знаешь типы exchange в RabbitMQ?

2.0 Middle🔥 121 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Типы Exchange в RabbitMQ

Exchange (маршрутизатор) в RabbitMQ определяет, как сообщения доставляются из producer в очереди. Существует 4 основных типа exchange с разными механизмами маршрутизации.

1. Direct Exchange

Маршрутизирует сообщения на основе точного совпадения routing key:

import pika
import json

# Подключение
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявление exchange
channel.exchange_declare(
    exchange='orders',
    exchange_type='direct',
    durable=True
)

# Объявление очередей
channel.queue_declare(queue='order.created', durable=True)
channel.queue_declare(queue='order.shipped', durable=True)

# Binding очереди к exchange с routing key
channel.queue_bind(
    exchange='orders',
    queue='order.created',
    routing_key='order.created'
)
channel.queue_bind(
    exchange='orders',
    queue='order.shipped',
    routing_key='order.shipped'
)

# Отправка сообщения
channel.basic_publish(
    exchange='orders',
    routing_key='order.created',  # Точный routing key
    body=json.dumps({'order_id': 123, 'total': 99.99})
)

# Получение сообщения
def callback(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    queue='order.created',
    on_message_callback=callback
)

print('Waiting for messages...')
channel.start_consuming()

Характеристики:

  • Используется для точного маршрутинга
  • Один routing key = одна очередь обычно
  • Идеален для RPC и команд

Примеры: email notifications, SMS alerts, command execution

2. Fanout Exchange

Шлёт сообщение всем очередям, привязанным к этому exchange (broadcast):

# Объявление fanout exchange
channel.exchange_declare(
    exchange='notifications',
    exchange_type='fanout',
    durable=True
)

# Несколько очередей подписаны на один exchange
channel.queue_declare(queue='email.queue', durable=True)
channel.queue_declare(queue='sms.queue', durable=True)
channel.queue_declare(queue='push.queue', durable=True)

# Все очереди получат сообщение независимо от routing key
channel.queue_bind(
    exchange='notifications',
    queue='email.queue'
)
channel.queue_bind(
    exchange='notifications',
    queue='sms.queue'
)
channel.queue_bind(
    exchange='notifications',
    queue='push.queue'
)

# Отправка - routing key игнорируется
channel.basic_publish(
    exchange='notifications',
    routing_key='',  # ignored
    body=json.dumps({'message': 'User registered', 'user_id': 42})
)

# Все три очереди получат это сообщение

Характеристики:

  • Broadcasting к множеству получателей
  • Routing key игнорируется
  • Идеален для событий, когда все должны знать

Примеры: User registered, System alert, Config update

3. Topic Exchange

Маршрутизирует по шаблону routing key с wildcard поддержкой:

# Объявление topic exchange
channel.exchange_declare(
    exchange='logs',
    exchange_type='topic',
    durable=True
)

# Очереди с pattern matching
channel.queue_declare(queue='logs.error', durable=True)
channel.queue_declare(queue='logs.all', durable=True)
channel.queue_declare(queue='app.service.debug', durable=True)

# Wildcard patterns:
# * (звёздочка) = ровно одно слово
# # (хеш) = ноль или больше слов

# Получать только ошибки
channel.queue_bind(
    exchange='logs',
    queue='logs.error',
    routing_key='logs.*.error'  # logs.app.error, logs.db.error
)

# Получать всё
channel.queue_bind(
    exchange='logs',
    queue='logs.all',
    routing_key='logs.#'  # Любые логи
)

# Отправка с разными routing keys
channel.basic_publish(
    exchange='logs',
    routing_key='logs.app.error',
    body=json.dumps({'level': 'ERROR', 'message': 'Database connection failed'})
)

channel.basic_publish(
    exchange='logs',
    routing_key='logs.api.info',
    body=json.dumps({'level': 'INFO', 'message': 'API request received'})
)

channel.basic_publish(
    exchange='logs',
    routing_key='logs.worker.debug',
    body=json.dumps({'level': 'DEBUG', 'message': 'Processing task'})
)

Характеристики:

  • Гибкая маршрутизация с шаблонами
  • * = одно слово (разделённое точкой)
  • # = ноль или больше слов
  • Мощный и гибкий

Примеры: Логирование (разные уровни), метрики, события

4. Headers Exchange

Маршрутизирует на основе заголовков (headers) сообщения:

# Объявление headers exchange
channel.exchange_declare(
    exchange='jobs',
    exchange_type='headers',
    durable=True
)

channel.queue_declare(queue='priority.high', durable=True)
channel.queue_declare(queue='priority.low', durable=True)

# Binding с условиями на заголовках
# 'x-match' может быть 'all' (все условия) или 'any' (хоть одно)
channel.queue_bind(
    exchange='jobs',
    queue='priority.high',
    arguments={
        'x-match': 'all',
        'priority': 'high',
        'type': 'email'
    }
)

channel.queue_bind(
    exchange='jobs',
    queue='priority.low',
    arguments={
        'x-match': 'any',
        'priority': 'low'
    }
)

# Отправка с заголовками
channel.basic_publish(
    exchange='jobs',
    routing_key='',
    properties=pika.BasicProperties(
        headers={
            'priority': 'high',
            'type': 'email',
            'user_id': '123'
        }
    ),
    body=json.dumps({'job': 'send_email', 'to': 'user@example.com'})
)

channel.basic_publish(
    exchange='jobs',
    routing_key='',
    properties=pika.BasicProperties(
        headers={
            'priority': 'low',
            'type': 'report'
        }
    ),
    body=json.dumps({'job': 'generate_report'})
)

Характеристики:

  • Маршрутизация по заголовкам, не по routing key
  • x-match: all = все условия должны совпадать
  • x-match: any = хоть одно совпадение
  • Менее распространён, но очень гибкий

Сравнение типов Exchange

ТипМаршрутизацияUse CaseПроизводительность
DirectТочный matchCommands, RPCВысокая
FanoutBroadcastEvents, notificationsВысокая
TopicPattern matchingLogs, metricsСредняя
HeadersHeader matchingComplex routingНизкая

Лучшие практики

# 1. Используй durable exchanges и queues для надёжности
channel.exchange_declare(exchange='main', durable=True)
channel.queue_declare(queue='my_queue', durable=True)

# 2. Используй message acknowledgment
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='my_queue', on_message_callback=callback)

# 3. Обработка ошибок и dead-letter queue
channel.queue_declare(
    queue='my_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-message-ttl': 3600000  # 1 час
    }
)

# 4. Retry логика
for attempt in range(3):
    try:
        # Обработка сообщения
        process_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        break
    except Exception as e:
        if attempt < 2:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        else:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

Выбор типа exchange зависит от паттерна обмена сообщениями: Direct для точного маршрутинга, Fanout для broadcast, Topic для гибкой маршрутизации, Headers для сложных условий.

Какие знаешь типы exchange в RabbitMQ? | PrepBro