← Назад к вопросам

Что такое точки обмена в RabbitMQ?

1.0 Junior🔥 131 комментариев
#DevOps и инфраструктура#Django

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Точки обмена (Exchange) в RabbitMQ

Точки обмена (Exchange) в RabbitMQ — это компонент маршрутизации, который получает сообщения от издателей (publishers) и маршрутизирует их в очереди (queues) на основе определенных правил. Exchange — это центральный коммутатор сообщений в RabbitMQ.

Архитектура RabbitMQ

Publisher → Exchange → Binding → Queue → Consumer (отправитель) (маршрутизатор) (правила) (очередь) (получатель)

Как это работает

  1. Publisher отправляет сообщение на Exchange
  2. Exchange смотрит на routing key и binding rules
  3. Exchange маршрутизирует сообщение в соответствующую Queue
  4. Consumer получает сообщение из Queue

Типы Exchange

1. Direct Exchange (Прямая маршрутизация)

Сообщение отправляется в очередь, если routing key совпадает с binding key.

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем Direct Exchange
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Объявляем очередь
channel.queue_declare(queue='error_logs', durable=True)
channel.queue_declare(queue='info_logs', durable=True)

# Привязываем очереди к Exchange с определенными routing keys
channel.queue_bind(exchange='direct_logs', queue='error_logs', routing_key='error')
channel.queue_bind(exchange='direct_logs', queue='info_logs', routing_key='info')

# Отправляем сообщение с routing_key='error'
message = {"level": "error", "message": "Something went wrong"}
channel.basic_publish(
    exchange='direct_logs',
    routing_key='error',
    body=json.dumps(message),
    properties=pika.BasicProperties(delivery_mode=2)
)

2. Fanout Exchange (Трансляция)

Отправляет сообщение всем привязанным очередям, игнорируя routing key.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='user_events', exchange_type='fanout')

channel.queue_declare(queue='notification_queue')
channel.queue_declare(queue='analytics_queue')

channel.queue_bind(exchange='user_events', queue='notification_queue')
channel.queue_bind(exchange='user_events', queue='analytics_queue')

event = {"event": "user_registered", "user_id": 123}
channel.basic_publish(
    exchange='user_events',
    routing_key='',
    body=json.dumps(event)
)

3. Topic Exchange (Тематическая маршрутизация)

Маршрутизирует сообщения с использованием pattern matching на routing key.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='topic')

channel.queue_declare(queue='all_logs')
channel.queue_declare(queue='error_logs')
channel.queue_declare(queue='order_logs')

# * совпадает с одним словом, # совпадает со всеми словами
channel.queue_bind(exchange='logs', queue='all_logs', routing_key='*.*')
channel.queue_bind(exchange='logs', queue='error_logs', routing_key='*.error')
channel.queue_bind(exchange='logs', queue='order_logs', routing_key='order.#')

channel.basic_publish(
    exchange='logs',
    routing_key='order.created',
    body=json.dumps({"action": "Order created"})
)

4. Headers Exchange

Маршрутизирует на основе заголовков сообщения, а не routing key.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='tasks', exchange_type='headers')
channel.queue_declare(queue='high_priority')

channel.queue_bind(
    exchange='tasks',
    queue='high_priority',
    arguments={'x-match': 'all', 'priority': 'high'}
)

channel.basic_publish(
    exchange='tasks',
    routing_key='',
    body=json.dumps({"task": "Process payment"}),
    properties=pika.BasicProperties(headers={'priority': 'high'})
)

Сравнение типов Exchange

ТипМаршрутизацияИспользование
DirectПо точному ключуКоманды, обработка
FanoutВсем очередямУведомления, события
TopicПо паттернуЛоги, фильтрация
HeadersПо заголовкамСложная маршрутизация

Практический пример: Order Processing

import pika
import json

class OrderPublisher:
    def __init__(self):
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = connection.channel()
        self.channel.exchange_declare(exchange='orders', exchange_type='topic')
    
    def publish_order_event(self, event_type, order_data):
        routing_key = f"order.{event_type}"
        self.channel.basic_publish(
            exchange='orders',
            routing_key=routing_key,
            body=json.dumps(order_data),
            properties=pika.BasicProperties(delivery_mode=2)
        )

publisher = OrderPublisher()
publisher.publish_order_event('created', {'id': 1, 'amount': 100})

Резюме

Exchange в RabbitMQ — это мощный механизм маршрутизации. Выбор типа Exchange зависит от требований: Direct для точной доставки, Fanout для трансляции, Topic для гибкой фильтрации, Headers для сложной маршрутизации. Понимание Exchange критично для построения надежных распределенных систем с асинхронной обработкой.

Что такое точки обмена в RabbitMQ? | PrepBro