← Назад к вопросам

В чем разница между Queue и Exchange в RabbitMQ?

1.8 Middle🔥 171 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Разница между Queue и Exchange в RabbitMQ

Основная архитектура RabbitMQ

RabbitMQ — это message broker, который доставляет сообщения от производителей к потребителям через exchange и queue.

┌──────────────┐
│  Producer    │ (отправляет сообщение)
└──────┬───────┘
       │ publish(message, routing_key)
       ↓
┌─────────────────────────┐
│    EXCHANGE             │ (маршрутизирует)
│  (фильтр и роутер)      │
└────────────┬────────────┘
             │ binding + routing_key
             ↓
    ┌────────────────┐
    │  QUEUE         │ (хранилище)
    │  (буфер)       │
    └────────┬───────┘
             │
             ↓
    ┌─────────────────┐
    │  Consumer       │ (получает)
    │  (обрабатывает) │
    └─────────────────┘

1. Exchange — маршрутизатор сообщений

Exchange — это точка входа для производителей. Он берёт сообщение и решает, куда его отправить.

import pika

# Подключение к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем exchange (маршрутизатор)
channel.exchange_declare(
    exchange='my_exchange',
    exchange_type='direct',  # Тип маршрутизации
    durable=True  # Сохраняется при перезагрузке
)

# Производитель отправляет сообщение в exchange
channel.basic_publish(
    exchange='my_exchange',
    routing_key='user.created',  # Ключ маршрутизации
    body='User ID 123 created',
    properties=pika.BasicProperties(
        delivery_mode=2  # Сделать сообщение persistent
    )
)

print("Сообщение отправлено в exchange 'my_exchange'")

Типы Exchange'ей:

1. Direct Exchange (точная маршрутизация)

Маршрутизирует по точному совпадению routing_key.

# Настройка
channel.exchange_declare(
    exchange='user_events',
    exchange_type='direct'
)

# Binding: queue привязывается к exchange с routing_key
channel.queue_declare(queue='user_created_queue')
channel.queue_bind(
    exchange='user_events',
    queue='user_created_queue',
    routing_key='user.created'  # Ключ маршрутизации
)

# Если routing_key == 'user.created', сообщение идёт в user_created_queue
channel.basic_publish(
    exchange='user_events',
    routing_key='user.created',  # Совпадает!
    body='User created'
)

# Если routing_key == 'user.deleted', сообщение НЕ идёт в user_created_queue
channel.basic_publish(
    exchange='user_events',
    routing_key='user.deleted',  # Не совпадает
    body='User deleted'  # Потеряется, если нет другой queue с таким ключом!
)

2. Fanout Exchange (рассылка всем)

Отправляет сообщение ВСЕм привязанным queue, игнорируя routing_key.

# Настройка
channel.exchange_declare(
    exchange='notifications',
    exchange_type='fanout'
)

# Несколько потребителей подписываются
channel.queue_declare(queue='email_notifications')
channel.queue_declare(queue='sms_notifications')
channel.queue_declare(queue='push_notifications')

channel.queue_bind(exchange='notifications', queue='email_notifications')
channel.queue_bind(exchange='notifications', queue='sms_notifications')
channel.queue_bind(exchange='notifications', queue='push_notifications')

# Производитель отправляет
channel.basic_publish(
    exchange='notifications',
    routing_key='',  # Игнорируется для fanout
    body='Important notification'
)

# ВСЕ три queue получат сообщение!

3. Topic Exchange (подстановка маски)

Маршрутизирует по маскам с wildcards (* и #).

# Настройка
channel.exchange_declare(
    exchange='logs',
    exchange_type='topic'
)

# Queue для всех ошибок
channel.queue_declare(queue='error_logs')
channel.queue_bind(
    exchange='logs',
    queue='error_logs',
    routing_key='logs.error.*'  # * = один уровень
)

# Queue для всех логов приложения
channel.queue_declare(queue='app_logs')
channel.queue_bind(
    exchange='logs',
    queue='app_logs',
    routing_key='logs.#'  # # = любое количество уровней
)

# Отправляем логи
channel.basic_publish(
    exchange='logs',
    routing_key='logs.error.database',  # Совпадает "logs.error.*" и "logs.#"
    body='Database error occurred'
)
# Сообщение идёт ОБОИМ queue

channel.basic_publish(
    exchange='logs',
    routing_key='logs.info.api.request',  # Совпадает только "logs.#"
    body='API request received'
)
# Сообщение идёт только в app_logs

4. Headers Exchange (маршрутизация по заголовкам)

Маршрутизирует по заголовкам сообщения, а не по routing_key.

# Настройка
channel.exchange_declare(
    exchange='async_tasks',
    exchange_type='headers'
)

# Queue для срочных задач
channel.queue_declare(queue='urgent_tasks')
channel.queue_bind(
    exchange='async_tasks',
    queue='urgent_tasks',
    arguments={
        'x-match': 'all',  # ВСЕ заголовки должны совпадать
        'priority': 'high',
        'type': 'computation'
    }
)

# Queue для обычных задач
channel.queue_declare(queue='normal_tasks')
channel.queue_bind(
    exchange='async_tasks',
    queue='normal_tasks',
    arguments={
        'x-match': 'any',  # ЛЮБОЙ из заголовков может совпадать
        'priority': 'low',
        'type': 'logging'
    }
)

# Отправляем сообщение с заголовками
channel.basic_publish(
    exchange='async_tasks',
    routing_key='',
    body='Compute something',
    properties=pika.BasicProperties(
        headers={
            'priority': 'high',
            'type': 'computation'
        }
    )
)
# Идёт в urgent_tasks (ВСЕ заголовки совпадают)

2. Queue — хранилище сообщений

Queue — это буфер, который хранит сообщения, ждущие обработки потребителем.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем queue
channel.queue_declare(
    queue='my_queue',
    durable=True,  # Сохраняется при перезагрузке
    exclusive=False,  # Может быть использована другими соединениями
    auto_delete=False  # Не удаляется автоматически
)

print(f"Queue объявлена: my_queue")

# Потребитель получает сообщения
def callback(ch, method, properties, body):
    print(f"Получено: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Подтверждаем обработку

# Подписываемся на queue
channel.basic_consume(
    queue='my_queue',
    on_message_callback=callback,
    auto_ack=False  # Ручное подтверждение
)

print("Ожидание сообщений...")
channel.start_consuming()

Важные параметры Queue:

1. Durability (Долговечность)

# durable=True: queue сохраняется при перезагрузке RabbitMQ
channel.queue_declare(queue='important_queue', durable=True)

# durable=False: queue удаляется при перезагрузке RabbitMQ
channel.queue_declare(queue='temporary_queue', durable=False)

2. Exclusive (Исключительность)

# exclusive=True: queue может быть используется только соединением, которое её создало
channel.queue_declare(queue='private_queue', exclusive=True)
# Удаляется автоматически, когда соединение закрывается

# exclusive=False: queue может использоваться несколькими соединениями
channel.queue_declare(queue='shared_queue', exclusive=False)

3. Auto-delete

# auto_delete=True: queue удаляется, когда последний consumer отписывается
channel.queue_declare(queue='auto_cleanup', auto_delete=True)

# auto_delete=False: queue существует до явного удаления
channel.queue_declare(queue='persistent', auto_delete=False)

4. Message TTL (Time To Live)

# Сообщения в queue удаляются через 30 минут
channel.queue_declare(
    queue='timeout_queue',
    arguments={
        'x-message-ttl': 30 * 60 * 1000  # 30 минут в миллисекундах
    }
)

5. Max Length (Максимальный размер)

# Queue может содержать максимум 1000 сообщений
channel.queue_declare(
    queue='limited_queue',
    arguments={
        'x-max-length': 1000
    }
)
# Старые сообщения удаляются, когда достигнут лимит

6. Dead Letter Exchange (очередь для ошибок)

# Queue для обработки ошибок
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters')
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='error')

# Main queue, которая отправляет ошибки в dlx
channel.queue_declare(
    queue='main_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'error'
    }
)

# Если consumer отклонит сообщение (nack), оно идёт в dead_letters

Exchange vs Queue: полное сравнение

АспектExchangeQueue
РольМаршрутизирует сообщенияХранит сообщения
Входная точкаДа (производитель пишет сюда)Нет (получает из exchange)
Хранит данныеНетДа
ОбязателенНет (можно писать прямо в queue)Да (нужна как минимум одна)
Удаляет сообщенияСразу после маршрутизацииПосле подтверждения consumer'а
КоличествоНесколько (для разных маршрутов)Несколько (для разных потребителей)

Практический пример: E-commerce система

import pika
import json
from datetime import datetime

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# ========== SETUP ==========

# Объявляем exchange для заказов
channel.exchange_declare(
    exchange='orders',
    exchange_type='topic',
    durable=True
)

# Объявляем queue для отправки email
channel.queue_declare(queue='email_queue', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='email_queue',
    routing_key='order.*'  # Все события заказов
)

# Объявляем queue для обновления инвентаря
channel.queue_declare(queue='inventory_queue', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='inventory_queue',
    routing_key='order.created'  # Только новые заказы
)

# Объявляем queue для аналитики
channel.queue_declare(queue='analytics_queue', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='analytics_queue',
    routing_key='order.#'  # Все события
)

# ========== PRODUCER ==========

def place_order(order_data):
    """Размещаем заказ"""
    
    # Публикуем событие "order.created"
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=json.dumps(order_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Persistent
            content_type='application/json'
        )
    )
    
    print(f"Заказ {order_data['id']} создан")

def cancel_order(order_id):
    """Отменяем заказ"""
    
    channel.basic_publish(
        exchange='orders',
        routing_key='order.cancelled',
        body=json.dumps({'order_id': order_id}),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    
    print(f"Заказ {order_id} отменён")

# ========== CONSUMERS ==========

def email_consumer():
    """Consumer для отправки email'ов"""
    
    def callback(ch, method, properties, body):
        data = json.loads(body)
        print(f"EMAIL: Отправляем письмо для заказа {data['id']}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(queue='email_queue', on_message_callback=callback)
    print("Email consumer запущен")
    channel.start_consuming()

def inventory_consumer():
    """Consumer для обновления инвентаря"""
    
    def callback(ch, method, properties, body):
        data = json.loads(body)
        print(f"INVENTORY: Резервируем товары для заказа {data['id']}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(queue='inventory_queue', on_message_callback=callback)
    print("Inventory consumer запущен")
    channel.start_consuming()

def analytics_consumer():
    """Consumer для аналитики"""
    
    def callback(ch, method, properties, body):
        data = json.loads(body)
        print(f"ANALYTICS: Записываем событие {method.routing_key}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(queue='analytics_queue', on_message_callback=callback)
    print("Analytics consumer запущен")
    channel.start_consuming()

# ========== USAGE ==========

if __name__ == '__main__':
    # Производитель размещает заказ
    order = {
        'id': '12345',
        'user_id': 'user_001',
        'items': [{'product_id': 'p1', 'quantity': 2}],
        'timestamp': datetime.now().isoformat()
    }
    
    place_order(order)
    
    # Потребители обрабатывают событие:
    # email_queue     ← order.created (отправить письмо)
    # inventory_queue ← order.created (зарезервировать товары)
    # analytics_queue ← order.created (записать в аналитику)

Лучшие практики

✅ Делай:

# 1. Используй durable queues для важных данных
channel.queue_declare(queue='important', durable=True)

# 2. Используй persistent сообщения
pika.BasicProperties(delivery_mode=2)

# 3. Используй ручное подтверждение (basic_ack)
channel.basic_consume(..., auto_ack=False)

# 4. Обрабатывай ошибки gracefully
try:
    process_message(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# 5. Используй topic/direct exchange для маршрутизации

❌ Не делай:

# Не используй auto_ack
channel.basic_consume(..., auto_ack=True)  # Опасно!

# Не теряй сообщения
channel.exchange_declare(exchange='temp', durable=False)  # Может потеряться

# Не используй временные queue для важных данных
channel.queue_declare(queue='data', auto_delete=True)

Заключение

  • Exchange — маршрутизирует сообщения в правильные queue'ы
  • Queue — хранит сообщения и ждёт потребителей
  • Binding — связывает exchange и queue с routing_key
  • Для надёжности используй durable=True на обоих
  • Topic exchange — лучший выбор для большинства случаев
В чем разница между Queue и Exchange в RabbitMQ? | PrepBro