← Назад к вопросам

Какие знаешь ситуации при которых Kafka теряет данные?

1.7 Middle🔥 161 комментариев
#REST API и микросервисы#Брокеры сообщений#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ситуации, при которых Kafka теряет данные

Kafka позиционируется как надёжная система распределённых очередей, но есть сценарии, когда данные могут быть потеряны. Вот основные:

1. Неправильная конфигурация Producer (acks)

acks=0 — producer не ждёт подтверждения от брокера:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "0");  // ОПАСНО! Данные могут потеряться

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "key", "value"));
// Producer отправил и сразу вернул управление
// Брокер может не получить сообщение — потеря данных!

Решение:

props.put("acks", "all");  // Жди подтверждения от всех replicas
props.put("retries", Integer.MAX_VALUE);  // Повторяй отправку
props.put("max.in.flight.requests.per.connection", 1);  // Порядок гарантирован

2. Низкая репликация (replication.factor < 2)

Если топик создан с replication.factor = 1:

kafka-topics --create --topic my-topic \
  --partitions 3 \
  --replication-factor 1  # ОПАСНО!

При падении брокера все данные теряются:

Broker 0: Partition 0 (Leader)
Broker 1: Partition 1 (Leader)
Broker 2: Partition 2 (Leader)

# Брокер 0 упал
# Partition 0 ПОТЕРЯНА НАВСЕГДА

Решение:

kafka-topics --create --topic my-topic \
  --partitions 3 \
  --replication-factor 3  # Минимум 2, лучше 3

3. Consumer не коммитит offset

Потребитель обрабатывает сообщения, но не сохраняет прогресс (offset):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false");  // Ручной коммит

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record);  // Обработали
        // НО offset НЕ сохранили!
    }
    // consumer.commitSync();  // ЭТА СТРОКА ЗАБЫТА
}

// При перезагрузке consumer начнёт с начала и потеряет обработанные сообщения

Решение:

props.put("enable.auto.commit", "true");  // Автоматический коммит
props.put("auto.commit.interval.ms", "1000");  // Коммит каждую секунду

// ИЛИ ручной коммит:
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record);
    }
    consumer.commitSync();  // Коммит offset
}

4. min.insync.replicas слишком низкий

Когда acks=all без min.insync.replicas, данные могут быть потеряны:

props.put("acks", "all");
props.put("min.insync.replicas", 1);  // НЕПРАВИЛЬНО!

Сценарий потери:

Replication Factor = 3
min.insync.replicas = 1
Broker 0 (Leader): OK
Broker 1 (Replica): OK
Broker 2 (Replica): НЕ ДОСТУПЕН

# Producer отправляет:
ack от Broker 0 = OK
# Producer думает: данные сохранены!
# Но на самом деле записано только на 1 брокер

# Broker 0 падает
# Из Broker 1 и 2 выбирается новый leader
# Данные теряются, если выбран Broker 2

Решение:

props.put("acks", "all");
props.put("min.insync.replicas", 2);  // Минимум 2 репликв должны подтвердить

Конфиг брокера:

min.insync.replicas=2
unclean.leader.election.enable=false

5. unclean.leader.election.enable = true

Когда все in-sync replicas недоступны, Kafka избирает "грязного" лидера (с устаревшими данными):

# Конфиг брокера (ОПАСНО)
unclean.leader.election.enable=true

# Сценарий:
# Leader (Broker 0): последнее сообщение = msg_1000
# Replica 1 (Broker 1): последнее сообщение = msg_999
# Replica 2 (Broker 2): последнее сообщение = msg_998

# Broker 0 падает
# Brokers 1 и 2 недоступны
# Kafka избирает Broker 2 как новый Leader
# Сообщение msg_1000 ПОТЕРЯНО

Решение:

unclean.leader.election.enable=false  # Запрети грязные выборы

6. Производитель не обрабатывает исключения

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

for (String message : messages) {
    producer.send(new ProducerRecord<>("topic", message));
    // Исключение может быть, но мы его не ловим!
}

producer.flush();  // Надеемся на лучшее

Решение:

producer.send(
    new ProducerRecord<>("topic", message),
    (metadata, exception) -> {
        if (exception != null) {
            log.error("Ошибка при отправке: ", exception);
            // Повтори отправку или сохрани в БД
            saveToFailureQueue(message);
        } else {
            log.info("Сообщение отправлено: " + metadata.topic());
        }
    }
);

7. Недостаточно времени на flush

producer.send(new ProducerRecord<>("topic", message));
producer.close();  // Сразу закрываем

// Данные могут быть в буфере, а не отправлены на брокер!

Решение:

producer.flush();  // Ждём отправки всех данных
producer.close(Duration.ofSeconds(10));  // Закрываем с таймаутом

8. Жёсткие ограничения по времени обработки

Потребитель обрабатывает долго и не отправляет heartbeat:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        longProcessing(record);  // Может занять > session.timeout.ms
    }
}

// Kafka считает потребителя мёртвым
// Переназначает partition другому consumer
// Сообщения обрабатываются дважды или теряются

Решение:

props.put("session.timeout.ms", "30000");  // 30 сек
props.put("max.poll.interval.ms", "300000");  // 5 минут
props.put("heartbeat.interval.ms", "10000");  // 10 сек

// Убедись, что обработка < max.poll.interval.ms

Конфигурация Kafka для максимальной надёжности

// Producer
Properties producerProps = new Properties();
producerProps.put("acks", "all");  // Жди всех replicas
producerProps.put("retries", Integer.MAX_VALUE);
producerProps.put("max.in.flight.requests.per.connection", 1);  // Порядок
producerProps.put("compression.type", "snappy");
producerProps.put("linger.ms", 100);  // Батчирование

// Consumer
Properties consumerProps = new Properties();
consumerProps.put("enable.auto.commit", "false");  // Ручной коммит
consumerProps.put("isolation.level", "read_committed");  // Только committed
consumerProps.put("session.timeout.ms", "30000");

// Топик
// --replication-factor 3
// --config min.insync.replicas=2
// --config unclean.leader.election.enable=false

Чеклист надёжности

  • acks=all на producer
  • min.insync.replicas >= 2
  • replication.factor >= 3
  • unclean.leader.election.enable=false
  • ✓ Consumer явно коммитит offset
  • ✓ Consumer обрабатывает исключения
  • ✓ Producer обрабатывает исключения
  • ✓ Timeout конфиги соответствуют обработке

Итог

Kafka теряет данные только при неправильной конфигурации. С правильными настройками это одна из самых надёжных систем очередей.

Какие знаешь ситуации при которых Kafka теряет данные? | PrepBro