← Назад к вопросам

Как можно достичь определенный вид доставки в Kafka?

1.0 Junior🔥 151 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Гарантии доставки в Apache Kafka

Три основных уровня гарантии доставки

Kafka предоставляет три уровня гарантии доставки сообщений, каждый с разными требованиями к надежности и производительности.

1. At-most-once (максимум один раз)

Сообщение может быть потеряно, но дублирования гарантированно не будет.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.RETRIES_CONFIG, "0");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record); // Fire and forget - не ждем ответ
producer.close();

Характеристики:

  • acks = 0: producer не ждет подтверждения от брокера
  • Наивысшая производительность
  • Риск потери данных при сбое брокера
  • Случай: логирование некритичных данных

2. At-least-once (минимум один раз)

Сообщение гарантированно будет доставлено, но может быть дублировано.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all"); // или "1" для быстрее
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");

try {
    RecordMetadata metadata = producer.send(record).get(); // Блокирующий вызов
    System.out.println("Message sent to partition " + metadata.partition() 
        + " with offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
producer.close();

Характеристики:

  • acks = all или acks = 1: ждем подтверждение
  • retries > 0: переотправляем при ошибке
  • Гарантия доставки со средней производительностью
  • Возможны дубликаты при сбое
  • Случай: критичные финансовые операции

3. Exactly-once (ровно один раз)

Сообщение доставляется ровно один раз без дублирования.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // КЛЮЧЕВОЙ параметр
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "unique-id-123");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();
    ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
    producer.send(record);
    producer.commitTransaction();
} catch (ProducerFenced | OutOfOrderSequenceException | AuthorizationException e) {
    producer.abortTransaction();
} finally {
    producer.close();
}

Параметры exactly-once:

  • enable.idempotence = true: убирает дубликаты на уровне producer
  • transactional.id: уникальный ID для трансакции
  • acks = all: ждем всех брокеров
  • Изоляция: READ_COMMITTED на consumer

Consumer: настройка гарантий

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Ручной коммит
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // Для exactly-once

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    consumer.commitSync(); // Коммитим только после обработки
}

Spring Kafka для exactly-once

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        return new DefaultProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

@Service
public class MessageService {
    @KafkaListener(topics = "my-topic")
    public void consumeMessage(String message, Acknowledgment ack) {
        try {
            processMessage(message);
            ack.acknowledge(); // Коммитим только после успешной обработки
        } catch (Exception e) {
            // Сообщение будет переобработано
            e.printStackTrace();
        }
    }
}

Сравнение уровней доставки

ПараметрAt-most-onceAt-least-onceExactly-once
acks01 или allall
retries0> 0MAX_VALUE
enable.idempotenceнетнетда
consumer auto committruefalsefalse
Потеря данныхвозможнанетнет
Дубликатынетвозможнынет
Производительностьнаивысшаясредняянизшая

Рекомендации

  1. Для финансовых операций: exactly-once с трансакциями
  2. Для критичных данных: at-least-once + идемпотентная обработка
  3. Для логирования: at-most-once для максимальной скорости
  4. Обрабатывай дубликаты на уровне приложения: используй unique key или версионирование
  5. Мониторь lag consumer groups: kafka-consumer-groups --describe
Как можно достичь определенный вид доставки в Kafka? | PrepBro