← Назад к вопросам

В какой ситуации сообщение брокера может продублироваться

2.7 Senior🔥 141 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Дублирование сообщений в брокере сообщений

В распределённых системах дублирование сообщений в брокере — это серьёзная проблема, которая может привести к потере денег, несогласованности данных или неправильным уведомлениям. Я столкнулся с этой проблемой при работе с RabbitMQ и Apache Kafka.

Основные сценарии дублирования

1. Сбой и перезагрузка brokers

Когда broker падает после подтверждения сообщения (ACK), но до сохранения этого ACK в persistent storage, при перезагрузке сообщение остаётся в очереди и будет переобработано.

# Пример: RabbitMQ
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)

def callback(ch, method, properties, body):
    try:
        process_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # ACK отправляется
    except Exception as e:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

# Если процесс упадёт между process_message() и basic_ack(),
# сообщение будет переобработано

2. Сбой consumer'а после обработки, но до ACK

Consumer обработал сообщение (изменил БД, отправил письмо), но упал до отправки подтверждения брокеру. Брокер будет переслать сообщение другому consumer'у.

# ❌ Уязвимый код
def process_payment(message):
    payment_id = message['payment_id']
    charge_card(payment_id)  # Платёж прошёл!
    # Consumer падает здесь
    return ACK  # Никогда не дойдёт

# ✅ Правильный подход: идемпотентность
def process_payment(message):
    payment_id = message['payment_id']
    
    # Проверяем: может быть, это переобработка?
    if database.payment_exists(payment_id):
        return ACK  # Уже обработано
    
    charge_card(payment_id)
    database.save_payment(payment_id)
    return ACK

3. Таймаут heartbeat'а

Если consumer долго обрабатывает сообщение, broker может думать, что consumer умер, и переслать сообщение другому consumer'у.

4. Сбой сети между brokers в кластере

При сбое сети между узлами кластера могут быть replicas на разных узлах, и при failover может произойти переобработка.

5. Consumer отправит ACK, но сам упадёт до завершения операции

# ❌ Плохо: ACK перед операцией
def process():
    ack_message()  # Подтверждаем
    save_to_database()  # Падаем здесь → данные потеряны

# ✅ Хорошо: ACK после операции
def process():
    save_to_database()
    ack_message()  # Подтверждаем только после успеха

Стратегии борьбы с дублированием

1. Идемпотентные обработчики

Обработчик должен быть идемпотентным — повторная обработка того же сообщения не вызывает побочных эффектов.

def add_to_cart(message):
    user_id = message['user_id']
    product_id = message['product_id']
    quantity = message['quantity']
    
    # Проверяем уникальность
    existing_item = db.query(CartItem).filter(
        CartItem.user_id == user_id,
        CartItem.product_id == product_id
    ).first()
    
    if existing_item:
        existing_item.quantity = quantity  # Обновляем, не удваиваем
    else:
        db.add(CartItem(user_id, product_id, quantity))
    
    db.commit()

2. Deduplication с помощью message ID

Сохраняем ID обработанных сообщений и пропускаем дубли.

class MessageProcessor:
    def process(self, message):
        message_id = message['id']
        
        # Проверяем: это дубль?
        if self.redis.exists(f"processed:{message_id}"):
            return  # Пропускаем
        
        # Обрабатываем
        self.handle_message(message)
        
        # Помечаем как обработанное (с TTL)
        self.redis.setex(f"processed:{message_id}", 86400, 1)

3. Exactly-once delivery

# Kafka с точно-одной доставкой
consumer = KafkaConsumer(
    'topic',
    isolation_level='read_committed',
    enable_auto_commit=False
)

for message in consumer:
    try:
        process_message(message)
        consumer.commit()  # Коммитим только после успеха
    except Exception:
        pass

4. Правильная конфигурация broker'а

# RabbitMQ: гарантии доставки
channel.queue_declare(
    queue='reliable_queue',
    durable=True,
    arguments={'x-max-length': 100000}
)

channel.basic_qos(
    prefetch_count=10
)

Главное правило: предполагай, что дублирование может случиться, и проектируй систему так, чтобы это было безопасно.

В какой ситуации сообщение брокера может продублироваться | PrepBro