← Назад к вопросам

Что такое принцип гарантированной доставки в Kafka?

3.0 Senior🔥 121 комментариев
#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Принцип гарантированной доставки в Apache Kafka

Гарантированная доставка (Message Delivery Guarantees) в Kafka — это фундаментальный принцип, который определяет, как Kafka обрабатывает сообщения и гарантирует их доставку между продьюсерами и консьюмерами. Это критично для надёжности распределённых систем.

Три уровня гарантии доставки

1. At-Most-Once (максимум один раз)

Сообщение может быть потеряно, но никогда не будет обработано дважды.

@Configuration
public class AtMostOnceKafkaConfig {
    
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // At-Most-Once: не ждём подтверждения
        configProps.put(ProducerConfig.ACKS_CONFIG, "0");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 0);
        
        return new DefaultProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

@Component
public class AtMostOnceProducer {
    private final KafkaTemplate<String, Message> kafkaTemplate;
    
    public AtMostOnceProducer(KafkaTemplate<String, Message> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void sendMessage(Message message) {
        // Не ждём подтверждения — может потеряться
        kafkaTemplate.send("events", message);
    }
}

Применение: Телеметрия, логи, неважные события.

2. At-Least-Once (минимум один раз)

Сообщение гарантированно будет обработано, но может быть обработано несколько раз.

@Configuration
public class AtLeastOnceKafkaConfig {
    
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // At-Least-Once: ждём подтверждения
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        
        return new DefaultProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        // At-Least-Once: вручную коммитим офсет после обработки
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        return new DefaultConsumerFactory<>(configProps);
    }
}

@Component
public class AtLeastOnceConsumer {
    private final OrderService orderService;
    private final KafkaTemplate<String, Message> kafkaTemplate;
    
    public AtLeastOnceConsumer(
            OrderService orderService,
            KafkaTemplate<String, Message> kafkaTemplate) {
        this.orderService = orderService;
        this.kafkaTemplate = kafkaTemplate;
    }
    
    @KafkaListener(topics = "orders", groupId = "order-service")
    public void consumeOrder(Order order, Acknowledgment ack) {
        try {
            // Обработка заказа
            orderService.process(order);
            
            // Коммитим офсет только после успешной обработки
            ack.acknowledge();
        } catch (Exception e) {
            // При ошибке не коммитим — сообщение будет переобработано
            System.out.println("Failed to process order: " + e.getMessage());
        }
    }
}

Применение: Обработка платежей, критичные операции.

3. Exactly-Once (ровно один раз)

Сообщение обрабатывается ровно один раз — идеальный сценарий.

@Configuration
@EnableKafka
public class ExactlyOnceKafkaConfig {
    
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        
        // Exactly-Once: используем транзакции
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-1");
        
        return new DefaultProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        KafkaTemplate<String, Message> template = 
            new KafkaTemplate<>(producerFactory());
        template.setDefaultTopic("events");
        return template;
    }
    
    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Isolation level: читаем только committed сообщения
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        return new DefaultConsumerFactory<>(configProps);
    }
}

@Service
public class ExactlyOnceOrderService {
    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
    private final JdbcTemplate jdbcTemplate;
    
    public ExactlyOnceOrderService(
            OrderRepository orderRepository,
            KafkaTemplate<String, OrderEvent> kafkaTemplate,
            JdbcTemplate jdbcTemplate) {
        this.orderRepository = orderRepository;
        this.kafkaTemplate = kafkaTemplate;
        this.jdbcTemplate = jdbcTemplate;
    }
    
    @Transactional
    public void processOrderWithExactlyOnce(Order order) {
        // 1. Сохранить заказ в БД
        Order savedOrder = orderRepository.save(order);
        
        // 2. Отправить событие в Kafka в той же транзакции
        // (идеоматически с помощью Transactional Outbox)
        kafkaTemplate.send("order-events", 
            new OrderEvent(savedOrder.getId(), "CREATED"));
        
        // Если возникает исключение, вся транзакция откатывается
    }
    
    // Идемпотентная обработка: можно безопасно переобработать
    @Transactional
    public void processOrderEvent(OrderEvent event) {
        // Проверить, не обработали ли уже это событие
        if (isAlreadyProcessed(event.getId())) {
            System.out.println("Event already processed: " + event.getId());
            return;
        }
        
        // Обработать
        Order order = orderRepository.findById(event.getOrderId())
            .orElseThrow();
        order.setStatus("CONFIRMED");
        orderRepository.save(order);
        
        // Записать, что обработали
        markAsProcessed(event.getId());
    }
    
    private boolean isAlreadyProcessed(String eventId) {
        Integer count = jdbcTemplate.queryForObject(
            "SELECT COUNT(*) FROM processed_events WHERE event_id = ?",
            Integer.class,
            eventId
        );
        return count != null && count > 0;
    }
    
    private void markAsProcessed(String eventId) {
        jdbcTemplate.update(
            "INSERT INTO processed_events (event_id, processed_at) VALUES (?, NOW())",
            eventId
        );
    }
}

@Component
public class ExactlyOnceConsumer {
    private final ExactlyOnceOrderService orderService;
    
    public ExactlyOnceConsumer(ExactlyOnceOrderService orderService) {
        this.orderService = orderService;
    }
    
    @KafkaListener(
        topics = "order-events",
        groupId = "order-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void consumeOrderEvent(
            OrderEvent event,
            Acknowledgment ack,
            Consumer<?, ?> consumer) {
        
        try {
            // Обработка гарантирует идемпотентность
            orderService.processOrderEvent(event);
            ack.acknowledge();
        } catch (Exception e) {
            // Не коммитим — переобработаем
            System.out.println("Failed: " + e.getMessage());
        }
    }
}

Практическая реализация

Transactional Outbox Pattern (лучшая практика):

@Component
public class TransactionalOutbox {
    private final JdbcTemplate jdbcTemplate;
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional
    public void saveOrderWithOutbox(Order order) {
        // 1. Сохранить заказ
        String orderId = generateId();
        jdbcTemplate.update(
            "INSERT INTO orders (id, status) VALUES (?, ?)",
            orderId, "PENDING"
        );
        
        // 2. Записать событие в outbox (в той же транзакции)
        jdbcTemplate.update(
            "INSERT INTO outbox (event_type, payload) VALUES (?, ?)",
            "OrderCreated",
            "{\"orderId\": \"" + orderId + "\"}"
        );
        // Если транзакция успешна — оба записаны
    }
    
    @Scheduled(fixedDelay = 1000)
    public void publishOutboxEvents() {
        // Периодически отправляем неотправленные события
        List<OutboxEvent> events = jdbcTemplate.query(
            "SELECT * FROM outbox WHERE published = false",
            this::mapToEvent
        );
        
        events.forEach(event -> {
            kafkaTemplate.send("order-events", event.getPayload());
            markAsPublished(event.getId());
        });
    }
}

Таблица сравнения гарантий

УровеньПотериДублиИспользование
At-Most-OnceВозможныНетМетрики, логи
At-Least-OnceНетВозможныПлатежи
Exactly-OnceНетНетКритичные операции

Ключевые параметры Kafka

// Producer
acks = "all"              // Ждём подтверждения от всех реплик
retries = 3               // Количество попыток
enable.idempotence = true // Идемпотентность
transactional.id = "id"   // Включить транзакции

// Consumer
enable.auto.commit = false     // Вручную коммитим
isolation.level = "read_committed"  // Читаем только committed

Заключение

Выбор уровня гарантии доставки зависит от характера данных:

  • Телеметрия: At-Most-Once (быстро, но потери допустимы)
  • Бизнес-операции: At-Least-Once + идемпотентность
  • Финансовые операции: Exactly-Once + Transactional Outbox

Это критическое знание для разработки надёжных распределённых систем.