← Назад к вопросам

Как записать партиции в Node без потерь данных

2.4 Senior🔥 111 комментариев
#Базы данных и SQL#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Гарантии записи партиций в Kafka без потерь данных

Безопасность записи (Producer Side)

Параметр acks

Основная защита от потери данных — это установка acks=all (или acks=-1). Это гарантирует, что сообщение записано на лидер и на все in-sync реплики (ISR).

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // Критично для надёжности
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Минимальное количество реплик (min.insync.replicas)

На уровне брокера установи min.insync.replicas=2. Это гарантирует, что сообщение записано хотя бы на 2 брокера перед ответом продьюсеру.

# В server.properties
min.insync.replicas=2

Обработка ошибок и retry-логика

Даже с acks=all, нужна правильная обработка исключений:

public void sendMessage(String topic, String key, String value) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
    
    try {
        RecordMetadata metadata = producer.send(record, (recordMetadata, exception) -> {
            if (exception != null) {
                logger.error("Ошибка отправки сообщения: " + exception.getMessage());
                // Можно добавить логику повторной отправки или сохранения в БД
            } else {
                logger.info("Сообщение отправлено в partition " + recordMetadata.partition() + 
                           " с offset " + recordMetadata.offset());
            }
        }).get(); // Блокирующий вызов для гарантии
    } catch (InterruptedException | ExecutionException e) {
        logger.error("Критическая ошибка отправки", e);
        // Dead Letter Queue или другой механизм обработки
    }
}

Идемпотентность продьюсера

Для предотвращения дубликатов при retry установи enable.idempotence=true. Kafka будет отслеживать seq_id и не примет дубликаты:

props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5); // До 5 при идемпотентности

Consumer Side гарантии

У консьюмера тоже есть ответственность:

props.put("isolation.level", "read_committed"); // Читаем только коммиченные данные
props.put("enable.auto.commit", false); // Ручной контроль смещений

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            processRecord(record); // Обработка с идемпотентностью
            consumer.commitSync(); // Коммит только после успеха
        } catch (Exception e) {
            logger.error("Ошибка обработки", e);
            // Не коммитим — переобработаем при переконнекте
        }
    }
}

Архитектурные паттерны для 100% надёжности

  1. Exactly-Once Semantics — используй трансакции Kafka:
props.put("transactional.id", "producer-1");
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}
  1. Dead Letter Queue — отправляй неудачные сообщения в отдельный топик для анализа.

  2. Event Sourcing — храни все изменения как события, восстанавливай состояние из event log.

Мониторинг и тестирование

  • Отслеживай метрики: records-lag-max, consumer-lag, record-error-rate
  • Симулируй отказы (kill broker, network partitions) в тестах
  • Используй testcontainers для локального Kafka в тестах

С этой стратегией потери данных исключены на 99.99% в production.