← Назад к вопросам
Когда может произойти потеря данных в Kafka?
3.0 Senior🔥 131 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Сценарии потери данных в Apache Kafka
Kafka — это распределённая система с гарантиями доставки, но потеря данных возможна при неправильной конфигурации. Рассмотрим все критические сценарии.
1. Потеря данных на стороне Producer (Производитель)
Неправильная конфигурация acks
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
// ❌ ОПАСНО — acks=0 (no acknowledgment)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "0"); // Не ждём подтверждения!
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Отправляем без ожидания подтверждения
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
// Сообщение может быть потеряно, если брокер сразу упадёт!
Проблема: Producer не ждёт подтверждения от брокера. Если сеть упадёт или брокер кросится во время отправки, сообщение будет потеряно.
Правильная конфигурация
// ✓ acks=all (все ISR должны подтвердить)
props.put(ProducerConfig.ACKS_CONFIG, "all"); // или "-1"
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Отправляем с callback для обработки ошибок
producer.send(new ProducerRecord<>("my-topic", "key", "value"),
(metadata, exception) -> {
if (exception != null) {
System.err.println("Ошибка отправки: " + exception.getMessage());
// Логируем для повторной отправки
} else {
System.out.println("Сообщение отправлено: " + metadata.offset());
}
}
);
Параметры acks:
acks=0— Leader не ждёт подтверждения (❌ риск потери)acks=1— Leader подтвердил (⚠️ может быть потеря при crash)acks=all(или-1) — все ISR (In-Sync Replicas) подтвердили (✓ безопасно)
2. Потеря данных на стороне Broker (Брокер)
Репликация и ISR (In-Sync Replicas)
// Конфигурация Topic (server.properties или программно)
// min.insync.replicas=2 — минимум 2 реплики в синхронизации
// replication.factor=3 — всего 3 реплики
// ❌ ОПАСНО — если брокер падает, теряются неподтверждённые сообщения
public class UnsafeScenario {
// Producer отправляет с acks=1 (только leader)
// Leader записывает в log, но не успел репликировать
// Leader кросится → сообщение потеряно!
}
// ✓ БЕЗОПАСНО — ждём подтверждения от replicas
public class SafeScenario {
// Producer отправляет с acks=all
// Leader ждёт подтверждения от min.insync.replicas replicas
// Только потом отправляет ack в Producer
// Теперь даже если leader упадёт, сообщение уже на других брокерах
}
Конфигурация надёжности
# server.properties (конфигурация брокера)
# Риск потери при shutdown
num.network.threads=8
num.io.threads=8
# Репликация
min.insync.replicas=2 # ✓ ВАЖНО! Минимум 2 реплики
default.replication.factor=3
unclean.leader.election.enable=false # ✓ Не выбирать лидера из not-ISR replicas
# Персистентность
log.flush.interval.messages=1000
log.retention.hours=168
log.segment.bytes=1073741824
# Индексирование
log.index.interval.bytes=4096
3. Потеря данных на стороне Consumer (Потребитель)
Проблема с обработкой offset
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
// ❌ ОПАСНО — автоматический commit offset перед обработкой
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // ❌ ОПАСНО!
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Offset уже committed в Kafka!
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record); // Если crash здесь → сообщение потеряно!
} catch (Exception e) {
// Уже нельзя переобработать
}
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Дорогостоящая операция (DB, API)
database.save(record.value());
}
Правильный подход
// ✓ БЕЗОПАСНО — manual commit после успешной обработки
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // ✓ Manual commit
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Обрабатываем сообщение
processRecord(record);
} catch (Exception e) {
// Offset не был committed, переобработаем позже
logger.error("Ошибка обработки: " + e.getMessage(), e);
break; // Выходим, offset не committed
}
}
// Commit ПОСЛЕ успешной обработки
consumer.commitSync(); // Синхронный commit (надёжный)
}
private void processRecord(ConsumerRecord<String, String> record)
throws Exception {
database.save(record.value()); // Может выкинуть исключение
}
Транзакционный подход
// ✓ МАКСИМАЛЬНО БЕЗОПАСНО — transactional processing
public class TransactionalConsumer {
public void processWithTransaction() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Обработка в единой транзакции с DB
database.beginTransaction();
database.save(record.value());
// Только после успешной записи в БД
consumer.commitSync();
database.commit();
} catch (Exception e) {
database.rollback();
logger.error("Откатываем сообщение: " + e.getMessage());
}
}
}
}
}
4. Потеря данных при Network Partition
// ❌ ОПАСНО — длинный timeout при сетевом разделении
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 300000); // 5 минут
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 600000); // 10 минут
// Во время partition Producer может потерять сообщения,
// думая что они доставлены, но партиция была недостижима
// ✓ ЛУЧШЕ — реакция на timeout
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 сек
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 мин
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Полный чеклист для предотвращения потери данных
Producer Side:
-
acks=all(вместо acks=0 или acks=1) -
enable.idempotence=true(избегаем дублирования) -
max.in.flight.requests.per.connection=1(или малое значение) - Обработка callback для ошибок
- Retries с exponential backoff
Broker Side:
-
min.insync.replicas>=2 -
unclean.leader.election.enable=false -
default.replication.factor>=3 - Мониторинг ISR reductions
Consumer Side:
-
enable.auto.commit=false(manual commit) - Commit ПОСЛЕ успешной обработки
- Idempotent processing (один и тот же offset можно обработать дважды)
- Обработка исключений
- Dead Letter Queue для не обрабатываемых сообщений
Общее:
- Мониторинг репликации lag
- Тестирование сценариев падения
- Логирование при обработке сообщений
Потеря данных в Kafka происходит из-за неправильной конфигурации и недостаточной обработки ошибок, а не из-за самой системы.