Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как работает RabbitMQ?
RabbitMQ — это мощная система обмена сообщениями (message broker), которая реализует протокол AMQP (Advanced Message Queuing Protocol). Это одно из самых популярных решений для асинхронного взаимодействия между компонентами приложения.
1. Архитектура и основные компоненты
RabbitMQ работает по модели 'производитель — посредник — потребитель'.
Основные компоненты:
- Producer (Издатель) — отправляет сообщения
- Exchange (Обменник) — маршрутизирует сообщения в очереди
- Queue (Очередь) — хранит сообщения
- Consumer (Потребитель) — обрабатывает сообщения
- Connection и Channel — соединение и логический канал коммуникации
2. Типы Exchange
Видов маршрутизации несколько:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 1. Direct Exchange — по точному ключу маршрутизации
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 2. Fanout Exchange — все очереди получают копию сообщения
channel.exchange_declare(exchange='logs_fanout', exchange_type='fanout')
# 3. Topic Exchange — по шаблону (wildcard)
channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
# 4. Headers Exchange — по заголовкам сообщения
channel.exchange_declare(exchange='logs_headers', exchange_type='headers')
3. Простой пример: Производитель
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='tasks', exchange_type='direct')
channel.queue_declare(queue='task_queue', durable=True)
channel.queue_bind(exchange='tasks', queue='task_queue', routing_key='process')
channel.basic_publish(
exchange='tasks',
routing_key='process',
body='Process this task',
properties=pika.BasicProperties(delivery_mode=2)
)
print('Message sent!')
connection.close()
4. Простой пример: Потребитель
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print(f'Received: {body.decode()}')
try:
process_task(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
5. Используем Celery (рекомендуется для Django/Python)
Целерь — это популярная библиотека для управления асинхронными задачами с RabbitMQ:
from celery import Celery
from celery.result import AsyncResult
app = Celery('tasks', broker='amqp://guest:guest@localhost//')
@app.task
def send_email(email, subject):
print(f'Sending email to {email}')
return f'Email sent to {email}'
result = send_email.delay('user@example.com', 'Hello')
print(f'Task ID: {result.id}')
status = AsyncResult(result.id).state
print(f'Status: {status}')
6. Гарантии доставки
RabbitMQ предоставляет несколько уровней гарантий:
- At most once — может быть потеряно
- At least once — может быть обработано дважды (с delivery_mode=2)
- Exactly once — требует собственного контроля идемпотентности
7. Важные концепции
Acknowledgment (Подтверждение):
- basic_ack() — сообщение обработано успешно
- basic_nack(requeue=True) — возвращаем в очередь
Prefetch (Предварительная загрузка):
- prefetch_count=1 — одновременно одно сообщение
- prefetch_count=10 — до 10 сообщений
Message Persistence:
- delivery_mode=2 — сообщение сохраняется на диск
- delivery_mode=1 — только в памяти
8. Dead Letter Exchange
Для обработки ошибок:
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letter_queue')
channel.queue_declare(
queue='main_queue',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed'
}
)
Практическое применение
- Email отправка — асинхронно
- Обработка изображений — resize, compression
- Analytics — асинхронная запись метрик
- Долгие операции — генерация отчётов
RabbitMQ — это мощный инструмент для масштабируемых приложений. Основной принцип: отправь сообщение → брокер маршрутизирует → потребитель обрабатывает.