← Назад к вопросам

Какие знаешь способы достижения exactly once в Kafka?

2.7 Senior🔥 91 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Способы достижения Exactly Once в Kafka

Exactly Once — это гарантия, что сообщение будет обработано ровно один раз, без дубликатов и потерь. В распределённых системах это сложно достичь из-за сетевых сбоев, отказов процессов и других проблем. Kafka предоставляет несколько механизмов для этого.

Проблема: Why is Exactly Once Hard?

В распределённой системе есть три точки отказа:

Producer -> Kafka -> Consumer

Producer side:

  • Может отправить сообщение дважды (retry после timeout)
  • Kafka не получит второе сообщение, consumer получит duplicates

Kafka side:

  • Если leader падает, follower может потерять некоммитированные сообщения

Consumer side:

  • Может обработать сообщение, но упасть перед commitom offset'а
  • При перезагрузке переобработает то же сообщение

1. Idempotent Producer

Производитель отправляет каждое сообщение с уникальным producer ID и sequence number. Kafka хранит последний sequence number и отклоняет дубликаты.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Включить idempotent producer
props.put("enable.idempotence", true); // Автоматически устанавливает:
// - acks=all
// - retries=Integer.MAX_VALUE
// - max.in.flight.requests.per.connection=5

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 100; i++) {
    ProducerRecord<String, String> record = 
        new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
    
    // Даже если отправить дважды, Kafka гарантирует, что сообщение 
    // будет записано только один раз
    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            System.out.println("Error: " + exception.getMessage());
        } else {
            System.out.println("Sent to offset: " + metadata.offset());
        }
    });
}

producer.close();

Как работает:

  • Producer сохраняет producer ID (PID) и epoch (версия)
  • Каждое сообщение получает sequence number (0, 1, 2, ...)
  • Broker проверяет: если sequence number <= последний сохраненный, это дубликат
  • Дубликат отклоняется

Ограничение: только for одного producer'а в один момент времени.

2. Transactional Producer + Consumer

Это самый надежный способ. Producer атомарно пишет в несколько partitions, consumer читает в transactional mode.

Producer:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Включить транзакции
props.put("transactional.id", "unique-producer-id-123"); // ОБЯЗАТЕЛЬНО уникален
props.put("enable.idempotence", true); // Включить idempotence

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

try {
    producer.initTransactions(); // Инициализировать
    
    // Начать транзакцию
    producer.beginTransaction();
    
    try {
        // Отправить сообщения АТОМАРНО
        producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
        producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
        
        // Коммитить транзакцию (все сообщения или ничего)
        producer.commitTransaction();
    } catch (Exception e) {
        // Откатить все сообщения
        producer.abortTransaction();
        throw e;
    }
} finally {
    producer.close();
}

Consumer:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Читать ТОЛЬКО committed сообщения (transactional)
props.put("isolation.level", "read_committed"); // Вместо read_uncommitted

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received: " + record.value());
        
        // Обработать сообщение
        processMessage(record);
    }
    
    // Consumer автоматически коммитит offset (enabled by default)
    consumer.commitSync(); // Синхронный коммит
}

Гарантии:

  • ✅ Producer: все сообщения в транзакции пишутся атомарно
  • ✅ Consumer: видит только committed сообщения (isolation.level=read_committed)
  • ✅ Exactly Once: дубликаты исключены

3. Offset Management with Database

Если сохранять обработанные offsets в той же базе вместе с результатом обработки, можно гарантировать Exactly Once.

public class ExactlyOnceConsumer {
    private KafkaConsumer<String, String> consumer;
    private Database database; // Ваша БД
    
    public void consume() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                // Всё в одной транзакции БД
                database.withTransaction(tx -> {
                    // 1. Проверить, обработано ли уже
                    if (database.isProcessed(record.offset())) {
                        return; // Дубликат, пропустить
                    }
                    
                    // 2. Обработать сообщение
                    processMessage(record);
                    
                    // 3. Сохранить результат + offset АТОМАРНО
                    database.saveResult(record.value(), record.offset());
                    
                    // Коммит транзакции
                });
            }
        }
    }
}

Как это работает:

  • Offset сохраняется в БД вместе с результатом обработки
  • Если consumer падает, при перезагрузке проверяет БД
  • Дубликаты автоматически пропускаются (уже обработаны)
  • Потери исключены (всё в ACID транзакции)

4. Semantic Partitioning

Если каждое сообщение обрабатывается одним потребителем (используя одинаковый ключ для каждого сообщения), можно простить дубликаты.

// Producer: отправить с одинаковым ключом
ProducerRecord<String, String> record = 
    new ProducerRecord<>("topic", "user-123", "process-data");
// Все сообщения с ключом user-123 пойдут в одну partition
// Обработаны одним потребителем

Это не совсем Exactly Once, но если обрабатывать сообщения идемпотентно, дубликаты станут безвредными.

5. Manual Offset Management

Вместо auto-commit, явно коммитить offset только после успешной обработки.

Properties props = new Properties();
props.put("enable.auto.commit", false); // Отключить auto-commit
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            // Обработать
            processMessage(record);
            
            // Только если успешно, коммитить
            consumer.commitSync();
        } catch (Exception e) {
            // Ошибка: offset НЕ коммитится
            // При перезагрузке переобработаем это сообщение
            System.err.println("Failed to process: " + e.getMessage());
        }
    }
}

Сравнение подходов

СпособСложностьГарантияКогда использовать
Idempotent ProducerНизкаяAt-least-onceПростые случаи
TransactionsВысокаяExactly-onceКритичные данные
Database OffsetСредняяExactly-onceЕсть своя БД
Manual OffsetНизкаяAt-least-onceИдемпотентная обработка
Semantic PartitionНизкаяAt-least-once + idempotenceОбработка по ключам

Лучшие практики для Exactly Once

✅ Всегда включать enable.idempotence=true ✅ Использовать Transactional Producer для критичных операций ✅ Consumer: isolation.level=read_committed ✅ Сохранять offset в БД рядом с результатом (database offset management) ✅ Сделать обработку идемпотентной (обработка дубликата = тот же результат) ✅ Мониторить duplicate metric в Prometheus/Grafana ❌ Не полагаться на auto-commit, если важны данные ❌ Не игнорировать network errors в producer

Точное один раз в распределённых системах требует либо дополнительной сложности (транзакции), либо идемпотентной обработки.