← Назад к вопросам

Зачем нужен брокер сообщений?

1.7 Middle🔥 171 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Роль брокера сообщений в современных архитектурах

Брокер сообщений — это промежуточный слой между производителями и потребителями данных. Он решает критические проблемы распределённых систем.

Основные задачи брокера

1. Развязка компонентов (Decoupling)

Производитель и потребитель работают независимо:

# Без брокера — жёсткая связанность
user_service.register(user)
mail_service.send_welcome_email(user)

# С брокером — слабая связанность
broker.publish("user.registered", user)

2. Асинхронность и масштабируемость

from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379')

@app.task
def send_email(email):
    return f"Email sent to {email}"

for user_email in users:
    send_email.delay(user_email)

3. Буферизация нагрузки

Если потребитель не справляется, сообщения накапливаются в брокере.

Популярные брокеры

RabbitMQ — надёжный, с гарантиями доставки:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks', durable=True)

channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body='Process this',
    properties=pika.BasicProperties(delivery_mode=2)
)

def callback(ch, method, properties, body):
    print(f"Received: {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Kafka — для высоконагруженных систем:

from kafka import KafkaProducer, KafkaConsumer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('events', {"user_id": 123, "action": "login"})

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
    print(f"Event: {message.value}")

Redis — простой и быстрый:

from redis import Redis

redis = Redis(host='localhost', port=6379)

redis.lpush('jobs', '{"task": "send_email"}')
job = redis.rpop('jobs')

Гарантии доставки

At-least-once delivery (RabbitMQ):

  • Сообщение доставляется минимум один раз
  • Может быть дублирование
  • Потребитель должен быть идемпотентным
def process_payment(transaction_id, amount):
    if redis.get(f"processed:{transaction_id}"):
        return "Already processed"
    
    charge_user(amount)
    redis.set(f"processed:{transaction_id}", "1", ex=86400)

Практический пример

from celery import Celery

app = Celery(broker='redis://localhost')

def register_user(user_data):
    user = create_user(user_data)
    app.send_task('events.user_registered', args=[user.id])
    return user

@app.task
def on_user_registered(user_id):
    user = get_user(user_id)
    send_email(user.email, "Welcome!")

Итоги

Брокер сообщений критичен для масштабируемости, надёжности и разделения ответственности в распределённых системах.