← Назад к вопросам

Для чего нужны брокеры очередей?

2.0 Middle🔥 251 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Назначение брокеров очередей

Брокеры очередей (message brokers) — это промежуточное ПО, которое управляет асинхронной доставкой сообщений между приложениями. Они критически важны в современной распределённой архитектуре.

Основные задачи

1. Асинхронная коммуникация (Decoupling)

Приложения не ждут ответа друг от друга, работают независимо:

# Без брокера — синхронный, блокирующий код
def create_user(name: str, email: str):
    user = save_to_db(name, email)  # Ждём
    send_email(email)               # Ждём
    update_analytics(user_id)       # Ждём
    return user  # Только после всего

# С брокером — асинхронный
import aio_pika

async def create_user_async(name: str, email: str):
    user = save_to_db(name, email)  # Быстро
    
    # Отправляем в очередь и сразу возвращаемся
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    
    queue = await channel.declare_queue('user_events')
    exchange = await channel.declare_exchange('users')
    
    message = aio_pika.Message(
        body=json.dumps({"user_id": user.id, "email": email}).encode()
    )
    
    await exchange.publish(message, routing_key='created')
    return user  # Вернулись быстро!
    # Email отправится асинхронно отдельным воркером

2. Обработка всплесков нагрузки (Load Balancing)

Брокер буферирует сообщения и обрабатывает их постепенно:

# Сценарий: 1000 запросов за секунду, но обработчик медленный

# Без брокера — сервер падает (timeout, OOM)
# POST /orders -> slow_payment_processing() -> response (30 сек ждёт клиент)

# С брокером — scalable
# POST /orders:
#   1. Сохраним в БД
#   2. Отправим в очередь Celery/RabbitMQ
#   3. Вернём ответ клиенту (200 OK за 100ms)
# 
# Воркеры обрабатывают из очереди постепенно:
#   4. 5 воркеров параллельно обрабатывают платежи
#   5. Клиент может проверить статус через webhook или polling

from celery import Celery

app = Celery('orders')

@app.task(bind=True, max_retries=3)
def process_payment(self, order_id: int, amount: float):
    try:
        charge_credit_card(amount)
        update_order_status(order_id, "paid")
    except PaymentError as exc:
        # Exponential backoff retry
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

3. Гарантия доставки (Reliability)

Брокер гарантирует доставку даже при сбоях:

# Сценарий: отправляем критические уведомления

# Без брокера — может потеряться
send_notification(user_email)  # Если воркер упадёт — уведомление потеряется

# С брокером — гарантия ACID
# RabbitMQ/Redis хранит сообщение до подтверждения обработки

from kombu import Connection, Exchange, Queue

with Connection('amqp://guest:guest@localhost//') as conn:
    exchange = Exchange('notifications', type='direct')
    queue = Queue('critical_alerts', exchange=exchange, routing_key='alert')
    
    # Сообщение сохраняется на диск до ACK
    with conn.Producer() as producer:
        producer.publish(
            {'user_id': 123, 'message': 'Action required'},
            exchange=exchange,
            routing_key='alert',
            declare=[queue],
            retry=True,  # Повторить при ошибке
            retry_policy={
                'max_retries': 3,
                'interval_start': 0.1,
                'interval_step': 0.2,
            }
        )

4. Масштабируемость (Scalability)

Можно добавлять воркеры без изменения кода:

# Архитектура
#
# [API Server] --(отправляет в очередь)-> [RabbitMQ]
#                                              |
#                                  ____________|____________
#                                 /            |            \
#                            [Worker 1]  [Worker 2]  [Worker 3]
#                            обрабатывает сообщения параллельно

# Consumer (слушает и обрабатывает)
from celery import Celery

app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost//',
    backend='redis://localhost:6379/0'
)

@app.task(name='send_email')
def send_email(to: str, subject: str, body: str):
    # Может запуститься на любом доступном воркере
    smtp.send_message(to, subject, body)

# Запускаем столько воркеров, сколько нужно
# celery -A tasks worker --loglevel=info
# celery -A tasks worker --loglevel=info  # Второй воркер
# celery -A tasks worker --loglevel=info  # Третий воркер

5. Разные типы коммуникации

# 1. Request-Reply (RPC)
@app.task(name='multiply')
def multiply(x, y):
    return x * y

result = multiply.delay(5, 3)  # Асинхронный
total = result.get(timeout=10)  # Ждём результат

# 2. Pub-Sub (один издатель, много подписчиков)
exchange = Exchange('notifications', type='fanout')
publish({"event": "user_created", "id": 123}, exchange=exchange)
# Все подписчики получат сообщение

# 3. Topic-based routing
exchange = Exchange('events', type='topic')
publish(
    {"data": "..."}, 
    exchange=exchange, 
    routing_key='users.created'  # Подходит для pattern 'users.*'
)

# 4. FIFO queue (порядок гарантирован)
queue = Queue('sequential_tasks', durable=True)
# Сообщения обрабатываются строго в порядке добавления

Популярные брокеры

БрокерОсобенностиКогда использовать
RabbitMQНадёжный, AMQP, дорого по памятиКритичная надёжность, сложная маршрутизация
RedisБыстрый, in-memory, простойHigh-load, simple tasks, caching
KafkaEvent streaming, масштабируемыйBig data, analytics, event sourcing
AWS SQSManaged, простой, дешёвыйAWS ecosystem, простые задачи
CeleryАбстракция над брокерами, DSLПланирование задач, retry logic

Реальный пример: обработка заказов

# API обработала и отправила в очередь
from fastapi import FastAPI
from celery import Celery

app = FastAPI()
celery = Celery('orders', broker='redis://localhost')

@app.post('/orders')
async def create_order(order: OrderSchema):
    order_id = save_order_to_db(order)
    
    # Отправляем в очередь
    process_order.delay(order_id)
    
    return {"order_id": order_id, "status": "pending"}

@celery.task(bind=True, max_retries=5)
def process_order(self, order_id: int):
    try:
        order = get_order(order_id)
        
        # 1. Проверяем инвентарь
        check_inventory(order.items)
        
        # 2. Обрабатываем платёж
        charge_payment(order.payment)
        
        # 3. Отправляем уведомление
        send_notification(order.customer_email)
        
        # 4. Передаём логистике
        create_shipment(order_id)
        
        update_order_status(order_id, "completed")
    except Exception as exc:
        self.retry(exc=exc, countdown=60 ** self.request.retries)

Вывод

Брокеры очередей — это essential infrastructure для любого высоконагруженного приложения. Они решают проблемы асинхронности, надёжности и масштабируемости в распределённых системах.