← Назад к вопросам
Какие знаешь типы exchange в RabbitMQ?
2.0 Middle🔥 121 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Типы Exchange в RabbitMQ
Exchange (маршрутизатор) в RabbitMQ определяет, как сообщения доставляются из producer в очереди. Существует 4 основных типа exchange с разными механизмами маршрутизации.
1. Direct Exchange
Маршрутизирует сообщения на основе точного совпадения routing key:
import pika
import json
# Подключение
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявление exchange
channel.exchange_declare(
exchange='orders',
exchange_type='direct',
durable=True
)
# Объявление очередей
channel.queue_declare(queue='order.created', durable=True)
channel.queue_declare(queue='order.shipped', durable=True)
# Binding очереди к exchange с routing key
channel.queue_bind(
exchange='orders',
queue='order.created',
routing_key='order.created'
)
channel.queue_bind(
exchange='orders',
queue='order.shipped',
routing_key='order.shipped'
)
# Отправка сообщения
channel.basic_publish(
exchange='orders',
routing_key='order.created', # Точный routing key
body=json.dumps({'order_id': 123, 'total': 99.99})
)
# Получение сообщения
def callback(ch, method, properties, body):
print(f'Received: {body.decode()}')
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='order.created',
on_message_callback=callback
)
print('Waiting for messages...')
channel.start_consuming()
Характеристики:
- Используется для точного маршрутинга
- Один routing key = одна очередь обычно
- Идеален для RPC и команд
Примеры: email notifications, SMS alerts, command execution
2. Fanout Exchange
Шлёт сообщение всем очередям, привязанным к этому exchange (broadcast):
# Объявление fanout exchange
channel.exchange_declare(
exchange='notifications',
exchange_type='fanout',
durable=True
)
# Несколько очередей подписаны на один exchange
channel.queue_declare(queue='email.queue', durable=True)
channel.queue_declare(queue='sms.queue', durable=True)
channel.queue_declare(queue='push.queue', durable=True)
# Все очереди получат сообщение независимо от routing key
channel.queue_bind(
exchange='notifications',
queue='email.queue'
)
channel.queue_bind(
exchange='notifications',
queue='sms.queue'
)
channel.queue_bind(
exchange='notifications',
queue='push.queue'
)
# Отправка - routing key игнорируется
channel.basic_publish(
exchange='notifications',
routing_key='', # ignored
body=json.dumps({'message': 'User registered', 'user_id': 42})
)
# Все три очереди получат это сообщение
Характеристики:
- Broadcasting к множеству получателей
- Routing key игнорируется
- Идеален для событий, когда все должны знать
Примеры: User registered, System alert, Config update
3. Topic Exchange
Маршрутизирует по шаблону routing key с wildcard поддержкой:
# Объявление topic exchange
channel.exchange_declare(
exchange='logs',
exchange_type='topic',
durable=True
)
# Очереди с pattern matching
channel.queue_declare(queue='logs.error', durable=True)
channel.queue_declare(queue='logs.all', durable=True)
channel.queue_declare(queue='app.service.debug', durable=True)
# Wildcard patterns:
# * (звёздочка) = ровно одно слово
# # (хеш) = ноль или больше слов
# Получать только ошибки
channel.queue_bind(
exchange='logs',
queue='logs.error',
routing_key='logs.*.error' # logs.app.error, logs.db.error
)
# Получать всё
channel.queue_bind(
exchange='logs',
queue='logs.all',
routing_key='logs.#' # Любые логи
)
# Отправка с разными routing keys
channel.basic_publish(
exchange='logs',
routing_key='logs.app.error',
body=json.dumps({'level': 'ERROR', 'message': 'Database connection failed'})
)
channel.basic_publish(
exchange='logs',
routing_key='logs.api.info',
body=json.dumps({'level': 'INFO', 'message': 'API request received'})
)
channel.basic_publish(
exchange='logs',
routing_key='logs.worker.debug',
body=json.dumps({'level': 'DEBUG', 'message': 'Processing task'})
)
Характеристики:
- Гибкая маршрутизация с шаблонами
*= одно слово (разделённое точкой)#= ноль или больше слов- Мощный и гибкий
Примеры: Логирование (разные уровни), метрики, события
4. Headers Exchange
Маршрутизирует на основе заголовков (headers) сообщения:
# Объявление headers exchange
channel.exchange_declare(
exchange='jobs',
exchange_type='headers',
durable=True
)
channel.queue_declare(queue='priority.high', durable=True)
channel.queue_declare(queue='priority.low', durable=True)
# Binding с условиями на заголовках
# 'x-match' может быть 'all' (все условия) или 'any' (хоть одно)
channel.queue_bind(
exchange='jobs',
queue='priority.high',
arguments={
'x-match': 'all',
'priority': 'high',
'type': 'email'
}
)
channel.queue_bind(
exchange='jobs',
queue='priority.low',
arguments={
'x-match': 'any',
'priority': 'low'
}
)
# Отправка с заголовками
channel.basic_publish(
exchange='jobs',
routing_key='',
properties=pika.BasicProperties(
headers={
'priority': 'high',
'type': 'email',
'user_id': '123'
}
),
body=json.dumps({'job': 'send_email', 'to': 'user@example.com'})
)
channel.basic_publish(
exchange='jobs',
routing_key='',
properties=pika.BasicProperties(
headers={
'priority': 'low',
'type': 'report'
}
),
body=json.dumps({'job': 'generate_report'})
)
Характеристики:
- Маршрутизация по заголовкам, не по routing key
x-match: all= все условия должны совпадатьx-match: any= хоть одно совпадение- Менее распространён, но очень гибкий
Сравнение типов Exchange
| Тип | Маршрутизация | Use Case | Производительность |
|---|---|---|---|
| Direct | Точный match | Commands, RPC | Высокая |
| Fanout | Broadcast | Events, notifications | Высокая |
| Topic | Pattern matching | Logs, metrics | Средняя |
| Headers | Header matching | Complex routing | Низкая |
Лучшие практики
# 1. Используй durable exchanges и queues для надёжности
channel.exchange_declare(exchange='main', durable=True)
channel.queue_declare(queue='my_queue', durable=True)
# 2. Используй message acknowledgment
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='my_queue', on_message_callback=callback)
# 3. Обработка ошибок и dead-letter queue
channel.queue_declare(
queue='my_queue',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 3600000 # 1 час
}
)
# 4. Retry логика
for attempt in range(3):
try:
# Обработка сообщения
process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
break
except Exception as e:
if attempt < 2:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
Выбор типа exchange зависит от паттерна обмена сообщениями: Direct для точного маршрутинга, Fanout для broadcast, Topic для гибкой маршрутизации, Headers для сложных условий.