← Назад к вопросам

Как работает Rabbit?

2.2 Middle🔥 161 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как работает RabbitMQ?

RabbitMQ — это мощная система обмена сообщениями (message broker), которая реализует протокол AMQP (Advanced Message Queuing Protocol). Это одно из самых популярных решений для асинхронного взаимодействия между компонентами приложения.

1. Архитектура и основные компоненты

RabbitMQ работает по модели 'производитель — посредник — потребитель'.

Основные компоненты:

  • Producer (Издатель) — отправляет сообщения
  • Exchange (Обменник) — маршрутизирует сообщения в очереди
  • Queue (Очередь) — хранит сообщения
  • Consumer (Потребитель) — обрабатывает сообщения
  • Connection и Channel — соединение и логический канал коммуникации

2. Типы Exchange

Видов маршрутизации несколько:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Direct Exchange — по точному ключу маршрутизации
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 2. Fanout Exchange — все очереди получают копию сообщения
channel.exchange_declare(exchange='logs_fanout', exchange_type='fanout')

# 3. Topic Exchange — по шаблону (wildcard)
channel.exchange_declare(exchange='logs_topic', exchange_type='topic')

# 4. Headers Exchange — по заголовкам сообщения
channel.exchange_declare(exchange='logs_headers', exchange_type='headers')

3. Простой пример: Производитель

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='tasks', exchange_type='direct')
channel.queue_declare(queue='task_queue', durable=True)
channel.queue_bind(exchange='tasks', queue='task_queue', routing_key='process')

channel.basic_publish(
    exchange='tasks',
    routing_key='process',
    body='Process this task',
    properties=pika.BasicProperties(delivery_mode=2)
)

print('Message sent!')
connection.close()

4. Простой пример: Потребитель

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    try:
        process_task(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()

5. Используем Celery (рекомендуется для Django/Python)

Целерь — это популярная библиотека для управления асинхронными задачами с RabbitMQ:

from celery import Celery
from celery.result import AsyncResult

app = Celery('tasks', broker='amqp://guest:guest@localhost//')

@app.task
def send_email(email, subject):
    print(f'Sending email to {email}')
    return f'Email sent to {email}'

result = send_email.delay('user@example.com', 'Hello')
print(f'Task ID: {result.id}')

status = AsyncResult(result.id).state
print(f'Status: {status}')

6. Гарантии доставки

RabbitMQ предоставляет несколько уровней гарантий:

  • At most once — может быть потеряно
  • At least once — может быть обработано дважды (с delivery_mode=2)
  • Exactly once — требует собственного контроля идемпотентности

7. Важные концепции

Acknowledgment (Подтверждение):

  • basic_ack() — сообщение обработано успешно
  • basic_nack(requeue=True) — возвращаем в очередь

Prefetch (Предварительная загрузка):

  • prefetch_count=1 — одновременно одно сообщение
  • prefetch_count=10 — до 10 сообщений

Message Persistence:

  • delivery_mode=2 — сообщение сохраняется на диск
  • delivery_mode=1 — только в памяти

8. Dead Letter Exchange

Для обработки ошибок:

channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letter_queue')

channel.queue_declare(
    queue='main_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'failed'
    }
)

Практическое применение

  • Email отправка — асинхронно
  • Обработка изображений — resize, compression
  • Analytics — асинхронная запись метрик
  • Долгие операции — генерация отчётов

RabbitMQ — это мощный инструмент для масштабируемых приложений. Основной принцип: отправь сообщение → брокер маршрутизирует → потребитель обрабатывает.

Как работает Rabbit? | PrepBro