← Назад к вопросам

Какие знаешь типы гарантии доставки сообщений?

3.0 Senior🔥 71 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Типы гарантии доставки сообщений

При работе с системами обмена сообщениями (message brokers: RabbitMQ, Kafka, AWS SQS и т.д.) критически важно понимать гарантии доставки. Они определяют, гарантировано ли, что сообщение будет обработано.

1. At-Most-Once (максимум один раз)

Сообщение может быть потеряно, но никогда не будет обработано больше одного раза.

Производитель отправляет сообщение M
                          ↓
            Message Broker получил
                          ↓
Потребитель получил и начал обрабатывать
                          ↓
Ошибка! Потребитель упал
                          ↓
Сообщение M ПОТЕРЯНО (не будет переотправлено)

Поток исполнения:

  1. Брокер получает сообщение
  2. Отправляет потребителю
  3. Потребитель НЕ отправляет ACK (подтверждение)
  4. Если потребитель упадёт, сообщение потеряется

Пример с RabbitMQ:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Потребитель НЕ отправляет ACK
def callback(ch, method, properties, body):
    print(f"Получено: {body}")
    # ❌ НЕ отправляем ch.basic_ack(method.delivery_tag)
    # Если здесь будет ошибка, сообщение потеряется
    if random.random() > 0.9:
        raise Exception("Ошибка обработки!")
    print("Обработано")

channel.basic_consume(
    queue='my_queue',
    on_message_callback=callback,
    auto_ack=False  # Требуем ручной ACK
)

channel.start_consuming()

Плюсы: очень быстро, низкая latency Минусы: потеря данных Когда использовать: метрики, логи, некритичные события

2. At-Least-Once (минимум один раз)

Сообщение гарантированно будет доставлено минимум один раз, но может быть дублировано.

Производитель отправляет сообщение M
                          ↓
            Message Broker получил
                          ↓
Потребитель получил и обработал
                          ↓
Ошибка! Потребитель не смог отправить ACK
                          ↓
Брокер переотправляет M (дубль!)
                          ↓
Сообщение M обработано ДВАЖДЫ

Поток исполнения:

  1. Брокер получает сообщение
  2. Отправляет потребителю
  3. Потребитель обрабатывает
  4. Потребитель отправляет ACK
  5. Если ACK не пришла (timeout), брокер переотправляет

Пример с RabbitMQ:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

results = []  # Отслеживаем обработанные ID

def callback(ch, method, properties, body):
    message_id = properties.message_id or str(time.time())
    
    # Проверка дубликата (idempotency)
    if message_id in results:
        print(f"Дубликат! {message_id}")
        ch.basic_ack(method.delivery_tag)
        return
    
    print(f"Обработка: {body}")
    # Обработка сообщения
    results.append(message_id)
    
    # Отправляем ACK
    ch.basic_ack(method.delivery_tag)

channel.basic_qos(prefetch_count=1)  # Один сегмент за раз
channel.basic_consume(
    queue='my_queue',
    on_message_callback=callback,
    auto_ack=False  # Требуем ручной ACK
)

channel.start_consuming()

Плюсы: надёжность, сообщения не потеряются Минусы: может быть дублирование, нужна idempotency Когда использовать: финансовые операции, заказы (с проверкой дубликатов)

3. Exactly-Once (ровно один раз)

Сообщение гарантированно будет обработано ровно один раз, без потерь и дубликатов.

Это ОЧЕНЬ сложно достичь в распределённых системах (см. Two-Phase Commit).

Производитель → Брокер → Потребитель → База данных
    ↓              ↓           ↓              ↓
   ACK ←---------- ACK ←------ ACK ←--------- ACK
   
Все должны подтвердить успех или все откатиться

Пример с Kafka (транзакции):

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json

# Использование транзакций в Kafka
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    enable_idempotence=True,  # Идемпотентные отправки
    acks='all',  # Ждём подтверждения от всех репликаций
    retries=3
)

# Отправка с гарантией
future = producer.send('my_topic', b'message')
try:
    record_metadata = future.get(timeout=10)
    print(f"Topic: {record_metadata.topic}")
    print(f"Partition: {record_metadata.partition}")
    print(f"Offset: {record_metadata.offset}")
except KafkaError:
    print("Ошибка отправки")

# Потребитель
consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    enable_auto_commit=False,  # Ручное управление офсетом
    isolation_level='read_committed'  # Читай только committed сообщения
)

for message in consumer:
    try:
        # Обработка
        print(f"Получено: {message.value}")
        # Только после успешной обработки
        consumer.commit()
    except Exception as e:
        print(f"Ошибка: {e}")
        # Не коммитим, сообщение будет переобработано

Плюсы: идеальная надёжность Минусы: очень сложно, высокая latency, требует поддержки брокера Когда использовать: критичные финансовые операции (но даже здесь сложно)

Сравнение

ГарантияПотеряДубликатыСкоростьСложность
At-Most-OnceВозможнаНетОчень высокаяНизкая
At-Least-OnceНетВозможныСредняяСредняя
Exactly-OnceНетНетНизкаяОчень высокая

Практический пример: RabbitMQ с Exactly-Once эмуляцией

import pika
import hashlib
from datetime import datetime

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# База данных для отслеживания обработанных
processed_messages = set()  # В реальности использовать Redis/DB

def idempotent_callback(ch, method, properties, body):
    # Вычисляем уникальный ID сообщения
    message_hash = hashlib.md5(body).hexdigest()
    
    # Проверяем, не обработали ли уже
    if message_hash in processed_messages:
        print(f"Дубликат! {message_hash}")
        ch.basic_ack(method.delivery_tag)
        return
    
    try:
        # Обработка сообщения
        print(f"Обработка: {body}")
        # ... ваша логика ...
        
        # Отмечаем как обработанное
        processed_messages.add(message_hash)
        
        # Отправляем ACK
        ch.basic_ack(method.delivery_tag)
    except Exception as e:
        print(f"Ошибка: {e}")
        # НЕ отправляем ACK, сообщение будет переобработано
        ch.basic_nack(method.delivery_tag, requeue=True)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    queue='my_queue',
    on_message_callback=idempotent_callback,
    auto_ack=False
)

channel.start_consuming()

Рекомендации

  • Используй At-Least-Once по умолчанию с проверкой дубликатов (idempotency)
  • Для критичных операций: реализуй Exactly-Once на уровне приложения
  • Всегда отслеживай обработанные ID (в Redis/DB)
  • Мониторь DLQ (Dead Letter Queue) для не обработанных сообщений
  • Документируй выбранный уровень гарантии для каждого очереди

Гарантии доставки — ключевой архитектурный выбор, влияющий на надёжность всей системы.

Какие знаешь типы гарантии доставки сообщений? | PrepBro