← Назад к вопросам
Как можно достичь определенный вид доставки в Kafka?
1.0 Junior🔥 151 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Гарантии доставки в Apache Kafka
Три основных уровня гарантии доставки
Kafka предоставляет три уровня гарантии доставки сообщений, каждый с разными требованиями к надежности и производительности.
1. At-most-once (максимум один раз)
Сообщение может быть потеряно, но дублирования гарантированно не будет.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.RETRIES_CONFIG, "0");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record); // Fire and forget - не ждем ответ
producer.close();
Характеристики:
acks = 0: producer не ждет подтверждения от брокера- Наивысшая производительность
- Риск потери данных при сбое брокера
- Случай: логирование некритичных данных
2. At-least-once (минимум один раз)
Сообщение гарантированно будет доставлено, но может быть дублировано.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all"); // или "1" для быстрее
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
try {
RecordMetadata metadata = producer.send(record).get(); // Блокирующий вызов
System.out.println("Message sent to partition " + metadata.partition()
+ " with offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
producer.close();
Характеристики:
acks = allилиacks = 1: ждем подтверждениеretries > 0: переотправляем при ошибке- Гарантия доставки со средней производительностью
- Возможны дубликаты при сбое
- Случай: критичные финансовые операции
3. Exactly-once (ровно один раз)
Сообщение доставляется ровно один раз без дублирования.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // КЛЮЧЕВОЙ параметр
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "unique-id-123");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
producer.commitTransaction();
} catch (ProducerFenced | OutOfOrderSequenceException | AuthorizationException e) {
producer.abortTransaction();
} finally {
producer.close();
}
Параметры exactly-once:
enable.idempotence = true: убирает дубликаты на уровне producertransactional.id: уникальный ID для трансакцииacks = all: ждем всех брокеров- Изоляция: READ_COMMITTED на consumer
Consumer: настройка гарантий
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Ручной коммит
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // Для exactly-once
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync(); // Коммитим только после обработки
}
Spring Kafka для exactly-once
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
@Service
public class MessageService {
@KafkaListener(topics = "my-topic")
public void consumeMessage(String message, Acknowledgment ack) {
try {
processMessage(message);
ack.acknowledge(); // Коммитим только после успешной обработки
} catch (Exception e) {
// Сообщение будет переобработано
e.printStackTrace();
}
}
}
Сравнение уровней доставки
| Параметр | At-most-once | At-least-once | Exactly-once |
|---|---|---|---|
| acks | 0 | 1 или all | all |
| retries | 0 | > 0 | MAX_VALUE |
| enable.idempotence | нет | нет | да |
| consumer auto commit | true | false | false |
| Потеря данных | возможна | нет | нет |
| Дубликаты | нет | возможны | нет |
| Производительность | наивысшая | средняя | низшая |
Рекомендации
- Для финансовых операций: exactly-once с трансакциями
- Для критичных данных: at-least-once + идемпотентная обработка
- Для логирования: at-most-once для максимальной скорости
- Обрабатывай дубликаты на уровне приложения: используй unique key или версионирование
- Мониторь lag consumer groups:
kafka-consumer-groups --describe