← Назад к вопросам

Что будет в Kafka если сообщение не доставлено?

2.0 Middle🔥 121 комментариев
#Основы Java

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что происходит в Kafka при недоставке сообщения

Общий ответ

В Kafka сообщение ВСЕГДА остаётся в топике, независимо от того, доставлено ли оно или нет. Это ключевое отличие Kafka от классических message queues. Проблема не в потере сообщения, а в том, как обработает потребитель этот сценарий.

Модель доставки в Kafka

Kafka гарантирует:

  1. Persistence — сообщение сохранено на диске
  2. Durability — при нормальной работе кластера потеря исключена
  3. Immutability — сообщение никогда не изменится

Kafka НЕ удаляет сообщение из топика просто так — оно остаётся согласно policy:

# Конфиг топика
retention.ms=604800000      # 7 дней
retention.bytes=1073741824  # или 1GB (в зависимости от того, что раньше)
log.segment.bytes=1073741824

Гарантии доставки (Delivery Semantics)

1. At-Most-Once (максимум один раз)

// Продюсер
ProducerRecord<String, String> record = new ProducerRecord<>(
    "my-topic", 
    "key", 
    "value"
);

producer.send(record);  // Fire and forget

// Проблема: если сеть упадёт, не знаем, доставилось ли
// Потребитель может никогда это не получить

Это ненадёжно для критичных данных.

2. At-Least-Once (минимум один раз)

// Продюсер с подтверждением
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            // Ошибка! Переотправим
            retryLogic();
        } else {
            // Успешно
            log.info("Message sent to partition " + metadata.partition());
        }
    }
});

// или современный подход
try {
    RecordMetadata metadata = producer.send(record).get();
    log.info("Sent to partition " + metadata.partition());
} catch (Exception e) {
    retryLogic();  // Переотправляем
}

Проблема: Если сеть упадёт после отправки, но до получения ответа, мы переотправим дубликат.

// Потребитель должен обработать дубликаты
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    // Обработка может быть вызвана дважды с одним сообщением
    processMessage(record.value());
}

3. Exactly-Once (ровно один раз) — самая сложная

Это требует специальной настройки:

// Продюсер конфиг
properties.put("acks", "all");        // Ждём подтверждения от всех replicas
properties.put("retries", Integer.MAX_VALUE);
properties.put("enable.idempotence", true);  // Включаем идемпотентность

Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Потребитель конфиг
properties.put("isolation.level", "read_committed");  // Только committed messages

Что происходит при недоставке (детально)

Сценарий 1: Брокер Kafka упал

Продюсер отправляет сообщение → Брокер 1 сохраняет → Отправляет в replicas (Брокер 2, 3)
↓ (Брокер 1 падает после сохранения, до репликации)

Что произойдёт:
1. Если acks=1: Продюсер получит ошибку (не доставлено на все replicas)
2. Если acks=all: Продюсер получит ошибку (не получил ack от всех)

В обоих случаях: СООБЩЕНИЕ ОСТАЁТСЯ В ЛОГЕ, потребители смогут его получить

Сценарий 2: Потребитель не обработал сообщение

// Потребитель
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    try {
        processMessage(record.value());
    } catch (Exception e) {
        // Ошибка при обработке!
        log.error("Failed to process", e);
        
        // КЛЮЧЕВОЙ МОМЕНТ: сообщение остаётся в топике
        // Можно либо:
        // 1. Коммитить offset (потеряем возможность переобработки)
        // 2. Не коммитить (переобработаем при рестарте)
    }
}

// Явный коммит
consumer.commitSync();  // Обновляем offset

Сценарий 3: Недоставка из-за сетевой ошибки

ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders", 
    "order123", 
    "Order data"
);

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // Сетевая ошибка!
        exception.printStackTrace();
        
        // Что произойдёт:
        // 1. Сообщение может остаться в producer buffer
        // 2. Можем переотправить (retry)
        // 3. Или отправить в DLQ (Dead Letter Queue)
        
        handleError(record);  // Специальная обработка
    }
});

Гарантии Kafka и что они означают

Гарантия 1: Если сообщение записано на диск, оно не потеряется

# Брокер конфиг
min.insync.replicas=2  # Минимум 2 репликаза для "all" acks

Если ты отправляешь с acks=all и min.insync.replicas=2, то сообщение будет на 2+ брокерах.

Гарантия 2: Порядок сообщений в партиции

// Все сообщения с одним key'ом идут в одну партицию
// Порядок ГАРАНТИРОВАН в одной партиции

ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    "user123",  // Key определяет партицию
    "Order#1"
);

Как организовать reliable processing

Pattern 1: Commit offset ПОСЛЕ успешной обработки

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            // 1. Обработаем
            processMessage(record.value());
            
            // 2. ПОТОМ коммитим
            consumer.commitSync();
        } catch (Exception e) {
            // Если ошибка: НЕ коммитим, переобработаем при рестарте
            log.error("Failed to process message", e);
        }
    }
}

Pattern 2: Dead Letter Queue (DLQ) для невозвратимых ошибок

for (ConsumerRecord<String, String> record : records) {
    try {
        processMessage(record.value());
        consumer.commitSync();
    } catch (NonRecoverableException e) {
        // Ошибка, которая не поправится при переобработке
        // Отправляем в DLQ
        sendToDLQ(record);
        consumer.commitSync();
    }
}

Pattern 3: Идемпотентная обработка (для дубликатов)

public void processOrder(OrderMessage order) {
    // Проверяем: этот order уже обработан?
    if (isOrderProcessed(order.getId())) {
        log.info("Order already processed: " + order.getId());
        return;  // Дубликат, пропускаем
    }
    
    // Обрабатываем
    saveOrder(order);
    markAsProcessed(order.getId());
}

Резюме: Что происходит при недоставке

СценарийЧто в топикеЧто делать
Брокер упалСообщение естьПереотправить с retry
Потребитель упалСообщение естьРестартовать (offset восстановится)
Сеть упалаМожет быть дубликатИдемпотентная обработка
Потребитель не обработалСообщение остаётсяНе коммитить offset
Невозвратимая ошибкаСообщение естьОтправить в DLQ

Ключевой вывод

Сообщение в Kafka НИКОГДА не исчезает просто так

Проблема не в потере, а в гарантиях доставки потребителю

Используй acks=all + min.insync.replicas=2 для критичных данных

Коммитии offset ТОЛЬКО после успешной обработки

Реализуй идемпотентную обработку для дубликатов

Используй DLQ для невозвратимых ошибок

Что будет в Kafka если сообщение не доставлено? | PrepBro