Что такое принцип гарантированной доставки в Kafka?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Принцип гарантированной доставки в Apache Kafka
Гарантированная доставка (Message Delivery Guarantees) в Kafka — это фундаментальный принцип, который определяет, как Kafka обрабатывает сообщения и гарантирует их доставку между продьюсерами и консьюмерами. Это критично для надёжности распределённых систем.
Три уровня гарантии доставки
1. At-Most-Once (максимум один раз)
Сообщение может быть потеряно, но никогда не будет обработано дважды.
@Configuration
public class AtMostOnceKafkaConfig {
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// At-Most-Once: не ждём подтверждения
configProps.put(ProducerConfig.ACKS_CONFIG, "0");
configProps.put(ProducerConfig.RETRIES_CONFIG, 0);
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Component
public class AtMostOnceProducer {
private final KafkaTemplate<String, Message> kafkaTemplate;
public AtMostOnceProducer(KafkaTemplate<String, Message> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(Message message) {
// Не ждём подтверждения — может потеряться
kafkaTemplate.send("events", message);
}
}
Применение: Телеметрия, логи, неважные события.
2. At-Least-Once (минимум один раз)
Сообщение гарантированно будет обработано, но может быть обработано несколько раз.
@Configuration
public class AtLeastOnceKafkaConfig {
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// At-Least-Once: ждём подтверждения
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// At-Least-Once: вручную коммитим офсет после обработки
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultConsumerFactory<>(configProps);
}
}
@Component
public class AtLeastOnceConsumer {
private final OrderService orderService;
private final KafkaTemplate<String, Message> kafkaTemplate;
public AtLeastOnceConsumer(
OrderService orderService,
KafkaTemplate<String, Message> kafkaTemplate) {
this.orderService = orderService;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(topics = "orders", groupId = "order-service")
public void consumeOrder(Order order, Acknowledgment ack) {
try {
// Обработка заказа
orderService.process(order);
// Коммитим офсет только после успешной обработки
ack.acknowledge();
} catch (Exception e) {
// При ошибке не коммитим — сообщение будет переобработано
System.out.println("Failed to process order: " + e.getMessage());
}
}
}
Применение: Обработка платежей, критичные операции.
3. Exactly-Once (ровно один раз)
Сообщение обрабатывается ровно один раз — идеальный сценарий.
@Configuration
@EnableKafka
public class ExactlyOnceKafkaConfig {
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// Exactly-Once: используем транзакции
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-1");
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
KafkaTemplate<String, Message> template =
new KafkaTemplate<>(producerFactory());
template.setDefaultTopic("events");
return template;
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Isolation level: читаем только committed сообщения
configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultConsumerFactory<>(configProps);
}
}
@Service
public class ExactlyOnceOrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
private final JdbcTemplate jdbcTemplate;
public ExactlyOnceOrderService(
OrderRepository orderRepository,
KafkaTemplate<String, OrderEvent> kafkaTemplate,
JdbcTemplate jdbcTemplate) {
this.orderRepository = orderRepository;
this.kafkaTemplate = kafkaTemplate;
this.jdbcTemplate = jdbcTemplate;
}
@Transactional
public void processOrderWithExactlyOnce(Order order) {
// 1. Сохранить заказ в БД
Order savedOrder = orderRepository.save(order);
// 2. Отправить событие в Kafka в той же транзакции
// (идеоматически с помощью Transactional Outbox)
kafkaTemplate.send("order-events",
new OrderEvent(savedOrder.getId(), "CREATED"));
// Если возникает исключение, вся транзакция откатывается
}
// Идемпотентная обработка: можно безопасно переобработать
@Transactional
public void processOrderEvent(OrderEvent event) {
// Проверить, не обработали ли уже это событие
if (isAlreadyProcessed(event.getId())) {
System.out.println("Event already processed: " + event.getId());
return;
}
// Обработать
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow();
order.setStatus("CONFIRMED");
orderRepository.save(order);
// Записать, что обработали
markAsProcessed(event.getId());
}
private boolean isAlreadyProcessed(String eventId) {
Integer count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_events WHERE event_id = ?",
Integer.class,
eventId
);
return count != null && count > 0;
}
private void markAsProcessed(String eventId) {
jdbcTemplate.update(
"INSERT INTO processed_events (event_id, processed_at) VALUES (?, NOW())",
eventId
);
}
}
@Component
public class ExactlyOnceConsumer {
private final ExactlyOnceOrderService orderService;
public ExactlyOnceConsumer(ExactlyOnceOrderService orderService) {
this.orderService = orderService;
}
@KafkaListener(
topics = "order-events",
groupId = "order-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeOrderEvent(
OrderEvent event,
Acknowledgment ack,
Consumer<?, ?> consumer) {
try {
// Обработка гарантирует идемпотентность
orderService.processOrderEvent(event);
ack.acknowledge();
} catch (Exception e) {
// Не коммитим — переобработаем
System.out.println("Failed: " + e.getMessage());
}
}
}
Практическая реализация
Transactional Outbox Pattern (лучшая практика):
@Component
public class TransactionalOutbox {
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void saveOrderWithOutbox(Order order) {
// 1. Сохранить заказ
String orderId = generateId();
jdbcTemplate.update(
"INSERT INTO orders (id, status) VALUES (?, ?)",
orderId, "PENDING"
);
// 2. Записать событие в outbox (в той же транзакции)
jdbcTemplate.update(
"INSERT INTO outbox (event_type, payload) VALUES (?, ?)",
"OrderCreated",
"{\"orderId\": \"" + orderId + "\"}"
);
// Если транзакция успешна — оба записаны
}
@Scheduled(fixedDelay = 1000)
public void publishOutboxEvents() {
// Периодически отправляем неотправленные события
List<OutboxEvent> events = jdbcTemplate.query(
"SELECT * FROM outbox WHERE published = false",
this::mapToEvent
);
events.forEach(event -> {
kafkaTemplate.send("order-events", event.getPayload());
markAsPublished(event.getId());
});
}
}
Таблица сравнения гарантий
| Уровень | Потери | Дубли | Использование |
|---|---|---|---|
| At-Most-Once | Возможны | Нет | Метрики, логи |
| At-Least-Once | Нет | Возможны | Платежи |
| Exactly-Once | Нет | Нет | Критичные операции |
Ключевые параметры Kafka
// Producer
acks = "all" // Ждём подтверждения от всех реплик
retries = 3 // Количество попыток
enable.idempotence = true // Идемпотентность
transactional.id = "id" // Включить транзакции
// Consumer
enable.auto.commit = false // Вручную коммитим
isolation.level = "read_committed" // Читаем только committed
Заключение
Выбор уровня гарантии доставки зависит от характера данных:
- Телеметрия: At-Most-Once (быстро, но потери допустимы)
- Бизнес-операции: At-Least-Once + идемпотентность
- Финансовые операции: Exactly-Once + Transactional Outbox
Это критическое знание для разработки надёжных распределённых систем.