← Назад к вопросам

Когда может произойти потеря данных в Kafka?

3.0 Senior🔥 131 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Сценарии потери данных в Apache Kafka

Kafka — это распределённая система с гарантиями доставки, но потеря данных возможна при неправильной конфигурации. Рассмотрим все критические сценарии.

1. Потеря данных на стороне Producer (Производитель)

Неправильная конфигурация acks

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

// ❌ ОПАСНО — acks=0 (no acknowledgment)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "0");  // Не ждём подтверждения!
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Отправляем без ожидания подтверждения
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
// Сообщение может быть потеряно, если брокер сразу упадёт!

Проблема: Producer не ждёт подтверждения от брокера. Если сеть упадёт или брокер кросится во время отправки, сообщение будет потеряно.

Правильная конфигурация

// ✓ acks=all (все ISR должны подтвердить)
props.put(ProducerConfig.ACKS_CONFIG, "all");  // или "-1"
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Отправляем с callback для обработки ошибок
producer.send(new ProducerRecord<>("my-topic", "key", "value"),
    (metadata, exception) -> {
        if (exception != null) {
            System.err.println("Ошибка отправки: " + exception.getMessage());
            // Логируем для повторной отправки
        } else {
            System.out.println("Сообщение отправлено: " + metadata.offset());
        }
    }
);

Параметры acks:

  • acks=0 — Leader не ждёт подтверждения (❌ риск потери)
  • acks=1 — Leader подтвердил (⚠️ может быть потеря при crash)
  • acks=all (или -1) — все ISR (In-Sync Replicas) подтвердили (✓ безопасно)

2. Потеря данных на стороне Broker (Брокер)

Репликация и ISR (In-Sync Replicas)

// Конфигурация Topic (server.properties или программно)
// min.insync.replicas=2  — минимум 2 реплики в синхронизации
// replication.factor=3   — всего 3 реплики

// ❌ ОПАСНО — если брокер падает, теряются неподтверждённые сообщения
public class UnsafeScenario {
    // Producer отправляет с acks=1 (только leader)
    // Leader записывает в log, но не успел репликировать
    // Leader кросится → сообщение потеряно!
}

// ✓ БЕЗОПАСНО — ждём подтверждения от replicas
public class SafeScenario {
    // Producer отправляет с acks=all
    // Leader ждёт подтверждения от min.insync.replicas replicas
    // Только потом отправляет ack в Producer
    // Теперь даже если leader упадёт, сообщение уже на других брокерах
}

Конфигурация надёжности

# server.properties (конфигурация брокера)

# Риск потери при shutdown
num.network.threads=8
num.io.threads=8

# Репликация
min.insync.replicas=2  # ✓ ВАЖНО! Минимум 2 реплики
default.replication.factor=3
unclean.leader.election.enable=false  # ✓ Не выбирать лидера из not-ISR replicas

# Персистентность
log.flush.interval.messages=1000
log.retention.hours=168
log.segment.bytes=1073741824

# Индексирование
log.index.interval.bytes=4096

3. Потеря данных на стороне Consumer (Потребитель)

Проблема с обработкой offset

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;

// ❌ ОПАСНО — автоматический commit offset перед обработкой
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);  // ❌ ОПАСНО!
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    // Offset уже committed в Kafka!
    for (ConsumerRecord<String, String> record : records) {
        try {
            processRecord(record);  // Если crash здесь → сообщение потеряно!
        } catch (Exception e) {
            // Уже нельзя переобработать
        }
    }
}

private void processRecord(ConsumerRecord<String, String> record) {
    // Дорогостоящая операция (DB, API)
    database.save(record.value());
}

Правильный подход

// ✓ БЕЗОПАСНО — manual commit после успешной обработки
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // ✓ Manual commit
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            // Обрабатываем сообщение
            processRecord(record);
        } catch (Exception e) {
            // Offset не был committed, переобработаем позже
            logger.error("Ошибка обработки: " + e.getMessage(), e);
            break;  // Выходим, offset не committed
        }
    }
    
    // Commit ПОСЛЕ успешной обработки
    consumer.commitSync();  // Синхронный commit (надёжный)
}

private void processRecord(ConsumerRecord<String, String> record) 
    throws Exception {
    database.save(record.value());  // Может выкинуть исключение
}

Транзакционный подход

// ✓ МАКСИМАЛЬНО БЕЗОПАСНО — transactional processing
public class TransactionalConsumer {
    public void processWithTransaction() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                try {
                    // Обработка в единой транзакции с DB
                    database.beginTransaction();
                    database.save(record.value());
                    // Только после успешной записи в БД
                    consumer.commitSync();  
                    database.commit();
                } catch (Exception e) {
                    database.rollback();
                    logger.error("Откатываем сообщение: " + e.getMessage());
                }
            }
        }
    }
}

4. Потеря данных при Network Partition

// ❌ ОПАСНО — длинный timeout при сетевом разделении
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 300000);  // 5 минут
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 600000); // 10 минут

// Во время partition Producer может потерять сообщения,
// думая что они доставлены, но партиция была недостижима

// ✓ ЛУЧШЕ — реакция на timeout
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);   // 30 сек
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);  // 2 мин
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

Полный чеклист для предотвращения потери данных

Producer Side:

  • acks=all (вместо acks=0 или acks=1)
  • enable.idempotence=true (избегаем дублирования)
  • max.in.flight.requests.per.connection=1 (или малое значение)
  • Обработка callback для ошибок
  • Retries с exponential backoff

Broker Side:

  • min.insync.replicas>=2
  • unclean.leader.election.enable=false
  • default.replication.factor>=3
  • Мониторинг ISR reductions

Consumer Side:

  • enable.auto.commit=false (manual commit)
  • Commit ПОСЛЕ успешной обработки
  • Idempotent processing (один и тот же offset можно обработать дважды)
  • Обработка исключений
  • Dead Letter Queue для не обрабатываемых сообщений

Общее:

  • Мониторинг репликации lag
  • Тестирование сценариев падения
  • Логирование при обработке сообщений

Потеря данных в Kafka происходит из-за неправильной конфигурации и недостаточной обработки ошибок, а не из-за самой системы.