← Назад к вопросам

Какие знаешь способы достижения at-least-once в Kafka?

2.7 Senior🔥 141 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

At-Least-Once доставка в Apache Kafka

At-Least-Once гарантия означает, что каждое сообщение будет доставлено потребителю хотя бы один раз. Возможны дубликаты, но нельзя потерять сообщения. Это один из трёх уровней гарантии в Kafka, наряду с At-Most-Once и Exactly-Once.

Три уровня гарантии в Kafka

  • At-Most-Once — каждое сообщение доставляется максимум один раз (возможна потеря)
  • At-Least-Once — каждое сообщение доставляется минимум один раз (возможны дубликаты)
  • Exactly-Once — каждое сообщение доставляется ровно один раз (нет потерь и дубликатов)

1. Отключение автоматического коммита offset

Первый и самый важный шаг — отключить автоматический коммит offset. Это дает контроль над тем, когда сообщение считается обработанным:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);  // ВАЖНО: отключить автокоммит
props.put("auto.offset.reset", "earliest");  // начать с начала, если нет offset

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

2. Обработка сообщений и ручной коммит

После успешной обработки сообщения, коммитим offset вручную:

consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            // Обработка сообщения
            processMessage(record.value());
            System.out.printf("Processed: %s%n", record.value());
            
            // После успешной обработки — коммитим
            consumer.commitSync();  // Синхронный коммит
            // или
            // consumer.commitAsync();  // Асинхронный коммит
        } catch (Exception e) {
            // Если обработка неудачна — НЕ коммитим
            System.err.printf("Error processing message: %s%n", e.getMessage());
            // В следующей итерации получим то же сообщение
        }
    }
}

3. Синхронный vs Асинхронный коммит

Синхронный коммит (commitSync):

  • Блокирует, пока offset не будет закоммичен
  • Безопаснее — гарантирует коммит
  • Медленнее
try {
    processMessage(record.value());
    consumer.commitSync();  // Ждем подтверждения
} catch (CommitFailedException e) {
    System.err.println("Commit failed: " + e.getMessage());
    // Переобработаем сообщение
}

Асинхронный коммит (commitAsync):

  • Не блокирует
  • Быстрее
  • Нужен callback для обработки ошибок
processMessage(record.value());
consumer.commitAsync(new OffsetCommitCallback() {
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        if (exception != null) {
            System.err.println("Async commit failed: " + exception.getMessage());
            // Переобработаем позже
        } else {
            System.out.println("Async commit succeeded");
        }
    }
});

4. Обработка перезагрузок и сбоев

At-Least-Once гарантия работает правильно, если потребитель перезагружается или падает:

public void startConsumer() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "my-group");
    props.put("enable.auto.commit", false);
    props.put("session.timeout.ms", 30000);  // Timeout для обнаружения сбоя
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("my-topic"));
    
    while (true) {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                processMessage(record.value());
                consumer.commitSync();
            }
        } catch (KafkaException e) {
            // При любом сбое — offset не закоммичена
            // В следующей попытке получим те же сообщения
            System.err.println("Kafka error: " + e.getMessage());
            // Переподключаемся
        }
    }
}

5. Идемпотентная обработка для дубликатов

Поскольку at-least-once допускает дубликаты, нужна идемпотентная обработка:

// Плохо — не идемпотентно
public void processPayment(String paymentId, double amount) {
    Account account = getAccount();
    account.setBalance(account.getBalance() + amount);  // Может добавить дважды!
    saveAccount(account);
}

// Хорошо — идемпотентно
public void processPayment(String paymentId, double amount) {
    // Проверяем, не обработано ли уже
    if (paymentProcessed(paymentId)) {
        System.out.println("Payment already processed: " + paymentId);
        return;
    }
    
    Account account = getAccount();
    account.setBalance(account.getBalance() + amount);
    saveAccount(account);
    
    // Записываем, что обработано
    markPaymentProcessed(paymentId);
}

// Или с использованием database transaction
public void processPayment(String paymentId, double amount) {
    try (Connection conn = getConnection()) {
        conn.setAutoCommit(false);
        
        // Atomic check-and-insert
        if (isPaymentProcessed(conn, paymentId)) {
            conn.commit();
            return;
        }
        
        updateBalance(conn, amount);
        insertPaymentRecord(conn, paymentId);
        
        conn.commit();
    }
}

6. Конфигурация producer для гарантии доставки

Для полноты at-least-once гарантии, producer тоже должен быть правильно настроен:

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Важные настройки
producerProps.put("acks", "all");  // Ждет подтверждения от всех replicas
producerProps.put("retries", Integer.MAX_VALUE);  // Повторять попытки
producerProps.put("max.in.flight.requests.per.connection", 1);  // Гарантия порядка

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

// Отправляем с callback
producer.send(new ProducerRecord<>("my-topic", "key", "value"), 
    new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                System.err.println("Failed to send: " + exception.getMessage());
                // Повторяем или логируем
            } else {
                System.out.println("Sent to partition " + metadata.partition() + 
                    " with offset " + metadata.offset());
            }
        }
    });

7. Мониторинг и обработка ошибок

public class AtLeastOnceConsumer {
    private static final Logger logger = LoggerFactory.getLogger(AtLeastOnceConsumer.class);
    private final int maxRetries = 3;
    
    public void consume() {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    boolean processed = false;
                    int retryCount = 0;
                    
                    while (!processed && retryCount < maxRetries) {
                        try {
                            processMessage(record);
                            consumer.commitSync();
                            processed = true;
                            logger.info("Message processed: " + record.key());
                        } catch (Exception e) {
                            retryCount++;
                            logger.warn("Error processing message (retry " + retryCount + "): " + e.getMessage());
                            if (retryCount >= maxRetries) {
                                logger.error("Max retries exceeded for message: " + record.key(), e);
                                // Отправляем в dead letter queue
                                sendToDeadLetterQueue(record);
                                consumer.commitSync();  // Коммитим, чтобы не повторять
                                processed = true;
                            }
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("Consumer error: " + e.getMessage(), e);
            }
        }
    }
}

Сравнение подходов

ПараметрAt-Least-OnceAt-Most-OnceExactly-Once
Потеря сообщенийНетВозможнаНет
ДубликатыВозможныНетНет
ПроизводительностьВысокаяОчень высокаяСредняя
Сложность реализацииСредняяНизкаяВысокая
Когда использоватьОбычноНекритичные данныеФинансовые операции

Заключение

At-Least-Once достигается через:

  1. Отключение автокоммита offset
  2. Ручной коммит после успешной обработки
  3. Идемпотентная обработка для дубликатов
  4. Правильная обработка ошибок и retries
  5. Корректная конфигурация producer с acks=all

Это баланс между надежностью и производительностью. At-Least-Once часто используется в production системах, потому что гарантирует, что сообщения не будут потеряны, а обработка дубликатов может быть легко реализована через идемпотентность.

Какие знаешь способы достижения at-least-once в Kafka? | PrepBro