Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Назначение брокеров очередей
Брокеры очередей (message brokers) — это промежуточное ПО, которое управляет асинхронной доставкой сообщений между приложениями. Они критически важны в современной распределённой архитектуре.
Основные задачи
1. Асинхронная коммуникация (Decoupling)
Приложения не ждут ответа друг от друга, работают независимо:
# Без брокера — синхронный, блокирующий код
def create_user(name: str, email: str):
user = save_to_db(name, email) # Ждём
send_email(email) # Ждём
update_analytics(user_id) # Ждём
return user # Только после всего
# С брокером — асинхронный
import aio_pika
async def create_user_async(name: str, email: str):
user = save_to_db(name, email) # Быстро
# Отправляем в очередь и сразу возвращаемся
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
queue = await channel.declare_queue('user_events')
exchange = await channel.declare_exchange('users')
message = aio_pika.Message(
body=json.dumps({"user_id": user.id, "email": email}).encode()
)
await exchange.publish(message, routing_key='created')
return user # Вернулись быстро!
# Email отправится асинхронно отдельным воркером
2. Обработка всплесков нагрузки (Load Balancing)
Брокер буферирует сообщения и обрабатывает их постепенно:
# Сценарий: 1000 запросов за секунду, но обработчик медленный
# Без брокера — сервер падает (timeout, OOM)
# POST /orders -> slow_payment_processing() -> response (30 сек ждёт клиент)
# С брокером — scalable
# POST /orders:
# 1. Сохраним в БД
# 2. Отправим в очередь Celery/RabbitMQ
# 3. Вернём ответ клиенту (200 OK за 100ms)
#
# Воркеры обрабатывают из очереди постепенно:
# 4. 5 воркеров параллельно обрабатывают платежи
# 5. Клиент может проверить статус через webhook или polling
from celery import Celery
app = Celery('orders')
@app.task(bind=True, max_retries=3)
def process_payment(self, order_id: int, amount: float):
try:
charge_credit_card(amount)
update_order_status(order_id, "paid")
except PaymentError as exc:
# Exponential backoff retry
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
3. Гарантия доставки (Reliability)
Брокер гарантирует доставку даже при сбоях:
# Сценарий: отправляем критические уведомления
# Без брокера — может потеряться
send_notification(user_email) # Если воркер упадёт — уведомление потеряется
# С брокером — гарантия ACID
# RabbitMQ/Redis хранит сообщение до подтверждения обработки
from kombu import Connection, Exchange, Queue
with Connection('amqp://guest:guest@localhost//') as conn:
exchange = Exchange('notifications', type='direct')
queue = Queue('critical_alerts', exchange=exchange, routing_key='alert')
# Сообщение сохраняется на диск до ACK
with conn.Producer() as producer:
producer.publish(
{'user_id': 123, 'message': 'Action required'},
exchange=exchange,
routing_key='alert',
declare=[queue],
retry=True, # Повторить при ошибке
retry_policy={
'max_retries': 3,
'interval_start': 0.1,
'interval_step': 0.2,
}
)
4. Масштабируемость (Scalability)
Можно добавлять воркеры без изменения кода:
# Архитектура
#
# [API Server] --(отправляет в очередь)-> [RabbitMQ]
# |
# ____________|____________
# / | \
# [Worker 1] [Worker 2] [Worker 3]
# обрабатывает сообщения параллельно
# Consumer (слушает и обрабатывает)
from celery import Celery
app = Celery(
'tasks',
broker='amqp://guest:guest@localhost//',
backend='redis://localhost:6379/0'
)
@app.task(name='send_email')
def send_email(to: str, subject: str, body: str):
# Может запуститься на любом доступном воркере
smtp.send_message(to, subject, body)
# Запускаем столько воркеров, сколько нужно
# celery -A tasks worker --loglevel=info
# celery -A tasks worker --loglevel=info # Второй воркер
# celery -A tasks worker --loglevel=info # Третий воркер
5. Разные типы коммуникации
# 1. Request-Reply (RPC)
@app.task(name='multiply')
def multiply(x, y):
return x * y
result = multiply.delay(5, 3) # Асинхронный
total = result.get(timeout=10) # Ждём результат
# 2. Pub-Sub (один издатель, много подписчиков)
exchange = Exchange('notifications', type='fanout')
publish({"event": "user_created", "id": 123}, exchange=exchange)
# Все подписчики получат сообщение
# 3. Topic-based routing
exchange = Exchange('events', type='topic')
publish(
{"data": "..."},
exchange=exchange,
routing_key='users.created' # Подходит для pattern 'users.*'
)
# 4. FIFO queue (порядок гарантирован)
queue = Queue('sequential_tasks', durable=True)
# Сообщения обрабатываются строго в порядке добавления
Популярные брокеры
| Брокер | Особенности | Когда использовать |
|---|---|---|
| RabbitMQ | Надёжный, AMQP, дорого по памяти | Критичная надёжность, сложная маршрутизация |
| Redis | Быстрый, in-memory, простой | High-load, simple tasks, caching |
| Kafka | Event streaming, масштабируемый | Big data, analytics, event sourcing |
| AWS SQS | Managed, простой, дешёвый | AWS ecosystem, простые задачи |
| Celery | Абстракция над брокерами, DSL | Планирование задач, retry logic |
Реальный пример: обработка заказов
# API обработала и отправила в очередь
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
celery = Celery('orders', broker='redis://localhost')
@app.post('/orders')
async def create_order(order: OrderSchema):
order_id = save_order_to_db(order)
# Отправляем в очередь
process_order.delay(order_id)
return {"order_id": order_id, "status": "pending"}
@celery.task(bind=True, max_retries=5)
def process_order(self, order_id: int):
try:
order = get_order(order_id)
# 1. Проверяем инвентарь
check_inventory(order.items)
# 2. Обрабатываем платёж
charge_payment(order.payment)
# 3. Отправляем уведомление
send_notification(order.customer_email)
# 4. Передаём логистике
create_shipment(order_id)
update_order_status(order_id, "completed")
except Exception as exc:
self.retry(exc=exc, countdown=60 ** self.request.retries)
Вывод
Брокеры очередей — это essential infrastructure для любого высоконагруженного приложения. Они решают проблемы асинхронности, надёжности и масштабируемости в распределённых системах.