← Назад к вопросам
Зачем нужен брокер сообщений?
1.7 Middle🔥 171 комментариев
#Архитектура и паттерны#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Роль брокера сообщений в современных архитектурах
Брокер сообщений — это промежуточный слой между производителями и потребителями данных. Он решает критические проблемы распределённых систем.
Основные задачи брокера
1. Развязка компонентов (Decoupling)
Производитель и потребитель работают независимо:
# Без брокера — жёсткая связанность
user_service.register(user)
mail_service.send_welcome_email(user)
# С брокером — слабая связанность
broker.publish("user.registered", user)
2. Асинхронность и масштабируемость
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379')
@app.task
def send_email(email):
return f"Email sent to {email}"
for user_email in users:
send_email.delay(user_email)
3. Буферизация нагрузки
Если потребитель не справляется, сообщения накапливаются в брокере.
Популярные брокеры
RabbitMQ — надёжный, с гарантиями доставки:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Process this',
properties=pika.BasicProperties(delivery_mode=2)
)
def callback(ch, method, properties, body):
print(f"Received: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
Kafka — для высоконагруженных систем:
from kafka import KafkaProducer, KafkaConsumer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('events', {"user_id": 123, "action": "login"})
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Event: {message.value}")
Redis — простой и быстрый:
from redis import Redis
redis = Redis(host='localhost', port=6379)
redis.lpush('jobs', '{"task": "send_email"}')
job = redis.rpop('jobs')
Гарантии доставки
At-least-once delivery (RabbitMQ):
- Сообщение доставляется минимум один раз
- Может быть дублирование
- Потребитель должен быть идемпотентным
def process_payment(transaction_id, amount):
if redis.get(f"processed:{transaction_id}"):
return "Already processed"
charge_user(amount)
redis.set(f"processed:{transaction_id}", "1", ex=86400)
Практический пример
from celery import Celery
app = Celery(broker='redis://localhost')
def register_user(user_data):
user = create_user(user_data)
app.send_task('events.user_registered', args=[user.id])
return user
@app.task
def on_user_registered(user_id):
user = get_user(user_id)
send_email(user.email, "Welcome!")
Итоги
Брокер сообщений критичен для масштабируемости, надёжности и разделения ответственности в распределённых системах.