← Назад к вопросам

Какие знаешь гарантии по чтению со стороны Consumer в Kafka?

2.0 Middle🔥 131 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Гарантии по чтению (Read Guarantees) в Kafka

Гарантии в Kafka определяют, как система обрабатывает сообщения при наличии ошибок, падений и отказов. Это критично для надежности обработки данных.

1. At-most-once (0 или 1 раз)

At-most-once гарантирует, что сообщение будет обработано максимум один раз. Может быть потеряно при сбое.

@Component
public class AtMostOnceConsumer {
    @KafkaListener(topics = "orders", 
                   groupId = "order-group",
                   properties = {
                       "enable.auto.commit=true",
                       "auto.commit.interval.ms=100"
                   })
    public void consumeOrder(Order order) {
        System.out.println("Processing order: " + order.getId());
        // Риск: если сбой ДО обработки, сообщение потеряется
        // Если сбой ПОСЛЕ обработки, это OK
    }
}

// Конфигурация
@Configuration
public class KafkaAtMostOnceConfig {
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-most-once");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);  // Auto-commit
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Когда использовать:

  • Некритичные метрики
  • Логирование
  • Analytics, где потеря одного event не критична

Риск: Если consumer падает между auto-commit и обработкой, сообщение теряется.

2. At-least-once (1 или больше раз)

At-least-once гарантирует, что сообщение будет обработано минимум один раз. Может быть обработано несколько раз.

@Component
public class AtLeastOnceConsumer {
    @Autowired
    private OrderService orderService;
    
    @KafkaListener(topics = "orders",
                   groupId = "order-group",
                   properties = {
                       "enable.auto.commit=false"  // Отключаем auto-commit
                   })
    public void consumeOrder(Order order, Acknowledgment ack) {
        try {
            System.out.println("Processing order: " + order.getId());
            orderService.saveOrder(order);
            
            // Подтвердим обработку ПОСЛЕ успешного завершения
            ack.acknowledge();
        } catch (Exception e) {
            // Если ошибка - не подтверждаем
            // Сообщение будет переобработано
            System.err.println("Error processing order: " + e.getMessage());
        }
    }
}

// Конфигурация
@Configuration
public class KafkaAtLeastOnceConfig {
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-least-once");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Manual commit
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);  // Manual ACK
        return factory;
    }
}

Как это работает:

  1. Consumer получает сообщение
  2. Обрабатывает сообщение
  3. Подтверждает offset (ack.acknowledge())
  4. Если падение до ack — сообщение переобработается

Когда использовать:

  • Финансовые операции
  • Заказы
  • Любые данные, где потеря недопустима

Требование: Consumer должен быть idempotent (безопасно обработать дважды).

// Идемпотентный consumer
@Service
public class IdempotentOrderService {
    @Autowired
    private OrderRepository orderRepository;
    
    public void processOrder(Order order) {
        // Проверим, обработан ли уже
        if (orderRepository.existsById(order.getId())) {
            System.out.println("Order already processed: " + order.getId());
            return;  // Пропустим повторную обработку
        }
        
        // Сохраним с PRIMARY KEY на order ID
        orderRepository.save(order);
    }
}

3. Exactly-once (ровно 1 раз)

Exactly-once гарантирует, что сообщение обработано ровно один раз, даже при падениях. Самая надежная, но сложная в реализации.

// Способ 1: Transactional processing
@Component
public class ExactlyOnceConsumer {
    @Autowired
    private OrderRepository orderRepository;
    
    @KafkaListener(topics = "orders",
                   groupId = "exactly-once-group",
                   properties = {
                       "enable.auto.commit=false",
                       "isolation.level=read_committed"  // Only committed messages
                   })
    @Transactional
    public void consumeOrder(Order order, Acknowledgment ack) {
        try {
            // Сохраняем в БД в трансакции
            orderRepository.save(order);
            
            // Если трансакция успешна - подтверждаем offset
            ack.acknowledge();
        } catch (Exception e) {
            // Трансакция откатится, offset не подтвердится
            // Сообщение будет переобработано
            throw new RuntimeException("Failed to process", e);
        }
    }
}

// Способ 2: Kafka Transactions (Producer + Consumer)
@Service
public class TransactionalOrderProcessor {
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    public void processOrderTransaction(Order inputOrder) {
        // Все операции в одной транзакции
        kafkaTemplate.executeInTransaction(kt -> {
            // Обработаем input order
            Order processed = new Order(inputOrder);
            processed.setStatus("PROCESSED");
            orderRepository.save(processed);
            
            // И отправим в output topic
            kt.send("processed-orders", processed.getId().toString(), processed);
            return null;
        });
    }
}

// Конфигурация
@Configuration
public class KafkaExactlyOnceConfig {
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "exactly-once");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");  // Only committed
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // Idempotent producer
        props.put(ProducerConfig.ACKS_CONFIG, "all");  // Wait for all replicas
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor");
        return new DefaultKafkaProducerFactory<>(props);
    }
}

Как Kafka достигает exactly-once:

  1. Idempotent Producer — один и тот же message никогда не напишется дважды
  2. Transactional writes — offset и данные пишутся атомарно
  3. Isolation level read_committed — читаем только committed сообщения
  4. Manual ack — подтверждение после успешной обработки

Когда использовать:

  • Финансовые транзакции
  • Критичные бизнес-операции
  • Системы, где дублирование недопустимо

Затраты:

  • Немного медленнее
  • Требует синхронизации между partition и consumer

Comparison таблица

ГарантияПотеряДублированиеUse CaseСложность
At-most-once✓ Возможна✗ NoMetrics, logsLow
At-least-once✗ No✓ ВозможноOrders, paymentsMedium
Exactly-once✗ No✗ NoCritical transactionsHigh

Практические рекомендации

// Defensive approach: At-least-once + Idempotency
@Component
public class RobustConsumer {
    @Autowired
    private OrderRepository orderRepository;
    
    @KafkaListener(topics = "orders", groupId = "robust")
    @Transactional
    public void processOrder(Order order, Acknowledgment ack) {
        try {
            // 1. Проверим, не обработан ли уже
            if (isAlreadyProcessed(order.getId())) {
                System.out.println("Duplicate order, skipping: " + order.getId());
                ack.acknowledge();
                return;
            }
            
            // 2. Обработаем с трансакцией
            orderRepository.save(order);
            sendNotification(order);
            
            // 3. Подтвердим только после всего
            ack.acknowledge();
        } catch (Exception e) {
            System.err.println("Error processing order", e);
            // Не подтверждаем - будет retry
        }
    }
    
    private boolean isAlreadyProcessed(Long orderId) {
        return orderRepository.existsById(orderId);
    }
}

Key Concepts

  • Offset — позиция в partition, которую consumer прочитал
  • Commit — сохранение offset в Kafka (либо автоматическое, либо ручное)
  • Rebalancing — переназначение partitions при добавлении/удалении consumers
  • Idempotency — свойство операции быть безопасной при повторении