Как записать партиции в Node без потерь данных
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Гарантии записи партиций в Kafka без потерь данных
Безопасность записи (Producer Side)
Параметр acks
Основная защита от потери данных — это установка acks=all (или acks=-1). Это гарантирует, что сообщение записано на лидер и на все in-sync реплики (ISR).
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // Критично для надёжности
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Минимальное количество реплик (min.insync.replicas)
На уровне брокера установи min.insync.replicas=2. Это гарантирует, что сообщение записано хотя бы на 2 брокера перед ответом продьюсеру.
# В server.properties
min.insync.replicas=2
Обработка ошибок и retry-логика
Даже с acks=all, нужна правильная обработка исключений:
public void sendMessage(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
try {
RecordMetadata metadata = producer.send(record, (recordMetadata, exception) -> {
if (exception != null) {
logger.error("Ошибка отправки сообщения: " + exception.getMessage());
// Можно добавить логику повторной отправки или сохранения в БД
} else {
logger.info("Сообщение отправлено в partition " + recordMetadata.partition() +
" с offset " + recordMetadata.offset());
}
}).get(); // Блокирующий вызов для гарантии
} catch (InterruptedException | ExecutionException e) {
logger.error("Критическая ошибка отправки", e);
// Dead Letter Queue или другой механизм обработки
}
}
Идемпотентность продьюсера
Для предотвращения дубликатов при retry установи enable.idempotence=true. Kafka будет отслеживать seq_id и не примет дубликаты:
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5); // До 5 при идемпотентности
Consumer Side гарантии
У консьюмера тоже есть ответственность:
props.put("isolation.level", "read_committed"); // Читаем только коммиченные данные
props.put("enable.auto.commit", false); // Ручной контроль смещений
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record); // Обработка с идемпотентностью
consumer.commitSync(); // Коммит только после успеха
} catch (Exception e) {
logger.error("Ошибка обработки", e);
// Не коммитим — переобработаем при переконнекте
}
}
}
Архитектурные паттерны для 100% надёжности
- Exactly-Once Semantics — используй трансакции Kafka:
props.put("transactional.id", "producer-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
-
Dead Letter Queue — отправляй неудачные сообщения в отдельный топик для анализа.
-
Event Sourcing — храни все изменения как события, восстанавливай состояние из event log.
Мониторинг и тестирование
- Отслеживай метрики:
records-lag-max,consumer-lag,record-error-rate - Симулируй отказы (kill broker, network partitions) в тестах
- Используй
testcontainersдля локального Kafka в тестах
С этой стратегией потери данных исключены на 99.99% в production.