Что будет в Kafka если сообщение не доставлено?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что происходит в Kafka при недоставке сообщения
Общий ответ
В Kafka сообщение ВСЕГДА остаётся в топике, независимо от того, доставлено ли оно или нет. Это ключевое отличие Kafka от классических message queues. Проблема не в потере сообщения, а в том, как обработает потребитель этот сценарий.
Модель доставки в Kafka
Kafka гарантирует:
- Persistence — сообщение сохранено на диске
- Durability — при нормальной работе кластера потеря исключена
- Immutability — сообщение никогда не изменится
Kafka НЕ удаляет сообщение из топика просто так — оно остаётся согласно policy:
# Конфиг топика
retention.ms=604800000 # 7 дней
retention.bytes=1073741824 # или 1GB (в зависимости от того, что раньше)
log.segment.bytes=1073741824
Гарантии доставки (Delivery Semantics)
1. At-Most-Once (максимум один раз)
// Продюсер
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
"key",
"value"
);
producer.send(record); // Fire and forget
// Проблема: если сеть упадёт, не знаем, доставилось ли
// Потребитель может никогда это не получить
Это ненадёжно для критичных данных.
2. At-Least-Once (минимум один раз)
// Продюсер с подтверждением
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// Ошибка! Переотправим
retryLogic();
} else {
// Успешно
log.info("Message sent to partition " + metadata.partition());
}
}
});
// или современный подход
try {
RecordMetadata metadata = producer.send(record).get();
log.info("Sent to partition " + metadata.partition());
} catch (Exception e) {
retryLogic(); // Переотправляем
}
Проблема: Если сеть упадёт после отправки, но до получения ответа, мы переотправим дубликат.
// Потребитель должен обработать дубликаты
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Обработка может быть вызвана дважды с одним сообщением
processMessage(record.value());
}
3. Exactly-Once (ровно один раз) — самая сложная
Это требует специальной настройки:
// Продюсер конфиг
properties.put("acks", "all"); // Ждём подтверждения от всех replicas
properties.put("retries", Integer.MAX_VALUE);
properties.put("enable.idempotence", true); // Включаем идемпотентность
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Потребитель конфиг
properties.put("isolation.level", "read_committed"); // Только committed messages
Что происходит при недоставке (детально)
Сценарий 1: Брокер Kafka упал
Продюсер отправляет сообщение → Брокер 1 сохраняет → Отправляет в replicas (Брокер 2, 3)
↓ (Брокер 1 падает после сохранения, до репликации)
Что произойдёт:
1. Если acks=1: Продюсер получит ошибку (не доставлено на все replicas)
2. Если acks=all: Продюсер получит ошибку (не получил ack от всех)
В обоих случаях: СООБЩЕНИЕ ОСТАЁТСЯ В ЛОГЕ, потребители смогут его получить
Сценарий 2: Потребитель не обработал сообщение
// Потребитель
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record.value());
} catch (Exception e) {
// Ошибка при обработке!
log.error("Failed to process", e);
// КЛЮЧЕВОЙ МОМЕНТ: сообщение остаётся в топике
// Можно либо:
// 1. Коммитить offset (потеряем возможность переобработки)
// 2. Не коммитить (переобработаем при рестарте)
}
}
// Явный коммит
consumer.commitSync(); // Обновляем offset
Сценарий 3: Недоставка из-за сетевой ошибки
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order123",
"Order data"
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Сетевая ошибка!
exception.printStackTrace();
// Что произойдёт:
// 1. Сообщение может остаться в producer buffer
// 2. Можем переотправить (retry)
// 3. Или отправить в DLQ (Dead Letter Queue)
handleError(record); // Специальная обработка
}
});
Гарантии Kafka и что они означают
Гарантия 1: Если сообщение записано на диск, оно не потеряется
# Брокер конфиг
min.insync.replicas=2 # Минимум 2 репликаза для "all" acks
Если ты отправляешь с acks=all и min.insync.replicas=2, то сообщение будет на 2+ брокерах.
Гарантия 2: Порядок сообщений в партиции
// Все сообщения с одним key'ом идут в одну партицию
// Порядок ГАРАНТИРОВАН в одной партиции
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"user123", // Key определяет партицию
"Order#1"
);
Как организовать reliable processing
Pattern 1: Commit offset ПОСЛЕ успешной обработки
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 1. Обработаем
processMessage(record.value());
// 2. ПОТОМ коммитим
consumer.commitSync();
} catch (Exception e) {
// Если ошибка: НЕ коммитим, переобработаем при рестарте
log.error("Failed to process message", e);
}
}
}
Pattern 2: Dead Letter Queue (DLQ) для невозвратимых ошибок
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record.value());
consumer.commitSync();
} catch (NonRecoverableException e) {
// Ошибка, которая не поправится при переобработке
// Отправляем в DLQ
sendToDLQ(record);
consumer.commitSync();
}
}
Pattern 3: Идемпотентная обработка (для дубликатов)
public void processOrder(OrderMessage order) {
// Проверяем: этот order уже обработан?
if (isOrderProcessed(order.getId())) {
log.info("Order already processed: " + order.getId());
return; // Дубликат, пропускаем
}
// Обрабатываем
saveOrder(order);
markAsProcessed(order.getId());
}
Резюме: Что происходит при недоставке
| Сценарий | Что в топике | Что делать |
|---|---|---|
| Брокер упал | Сообщение есть | Переотправить с retry |
| Потребитель упал | Сообщение есть | Рестартовать (offset восстановится) |
| Сеть упала | Может быть дубликат | Идемпотентная обработка |
| Потребитель не обработал | Сообщение остаётся | Не коммитить offset |
| Невозвратимая ошибка | Сообщение есть | Отправить в DLQ |
Ключевой вывод
✓ Сообщение в Kafka НИКОГДА не исчезает просто так
✓ Проблема не в потере, а в гарантиях доставки потребителю
✓ Используй acks=all + min.insync.replicas=2 для критичных данных
✓ Коммитии offset ТОЛЬКО после успешной обработки
✓ Реализуй идемпотентную обработку для дубликатов
✓ Используй DLQ для невозвратимых ошибок