← Назад к вопросам

Можно ли в Kafka прочитать одно и то же сообщение два раза?

2.0 Middle🔥 151 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Можно ли в Kafka прочитать одно и то же сообщение два раза?

Краткий ответ

Да, можно. Kafka гарантирует доставку сообщений, но не гарантирует ровно один раз (exactly-once). Это означает, что при определённых обстоятельствах сообщение может быть обработано несколько раз.

Гарантии доставки в Kafka

Kafka поддерживает три режима доставки:

1. At-most-once (максимум один раз)

properties.put("acks", "0");
properties.put("enable.idempotence", false);
  • Сообщение может быть потеряно
  • Сообщение не будет прочитано дважды
  • Используется редко в production

2. At-least-once (минимум один раз)

properties.put("acks", "all");
properties.put("retries", Integer.MAX_VALUE);
properties.put("enable.idempotence", false);
  • Сообщение будет доставлено, но может дублироваться
  • Это режим по умолчанию в Kafka
  • Требует idempotent обработки на стороне consumer

3. Exactly-once (ровно один раз)

properties.put("enable.idempotence", true);
properties.put("transactional.id", "my-app-" + UUID.randomUUID());
properties.put("isolation.level", "read_committed");
  • Гарантирует, что сообщение будет обработано ровно один раз
  • Требует больше ресурсов и медленнее

Сценарии, когда сообщение прочитается дважды

Сценарий 1: Сбой consumer после обработки, но перед commit

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(3);
    // Без явного commit - возможны дубли
    return factory;
}

@KafkaListener(topics = "my-topic")
public void listen(String message) {
    processMessage(message);  // Сбой здесь...
    // ...offset не был закоммичен, сообщение прочитается снова
}

Сценарий 2: Rebalance в consumer group

// Consumer группа теряет обработчик
// Kafka переоценивает partition'ы между оставшимися consumer'ами
// Сообщения, которые были в обработке, могут быть переданы другому consumer'у

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // Если здесь не сделать commit, сообщения вернутся в очередь
    consumer.commitSync();
}

Сценарий 3: Duplicate в producer

properties.put("enable.idempotence", false);
properties.put("acks", "1");

// Producer отправляет сообщение
// Kafka записала в 1 replica, но не в остальные
// Producer не получает ack и переотправляет
// Сообщение попадает в topic дважды

Правильная обработка дубликатов

Способ 1: Idempotent processing

@Service
public class OrderService {
    
    @KafkaListener(topics = "order-events")
    public void processOrder(OrderEvent event) {
        // Проверяем, был ли этот заказ уже обработан
        if (orderRepository.existsById(event.getOrderId())) {
            log.info("Заказ {} уже обработан, пропускаем", event.getOrderId());
            return;
        }
        
        Order order = new Order(event);
        orderRepository.save(order);
        log.info("Заказ {} обработан", event.getOrderId());
    }
}

Способ 2: Использование unique constraint

@Entity
@Table(name = "orders", uniqueConstraints = {
    @UniqueConstraint(columnNames = "kafka_message_id")
})
public class Order {
    @Id
    private Long id;
    
    @Column(nullable = false, unique = true)
    private String kafkaMessageId;
    
    private String orderContent;
}

Способ 3: Exactly-once с transactional consumer

@Service
public class TransactionalOrderService {
    
    @KafkaListener(topics = "order-events",
                   groupId = "order-group")
    @Transactional
    public void processOrder(OrderEvent event) {
        // Сообщение и offset коммитятся в одной транзакции
        Order order = new Order(event);
        orderRepository.save(order);
        
        // Offset будет закоммичен только если транзакция успешна
    }
}

Практический пример: Надёжная обработка платежей

@Service
public class PaymentKafkaListener {
    private static final String TOPIC = "payment-events";
    
    @KafkaListener(topics = TOPIC, groupId = "payment-processor")
    public void processPayment(PaymentEvent event) {
        try {
            // Проверяем идемпотентность
            Payment existing = paymentRepository.findByIdempotencyKey(
                event.getIdempotencyKey()
            );
            if (existing != null) {
                log.warn("Payment {} already processed", event.getId());
                return;
            }
            
            // Обрабатываем платёж
            Payment payment = processPaymentTransaction(event);
            paymentRepository.save(payment);
            
            log.info("Payment {} processed successfully", event.getId());
        } catch (Exception e) {
            log.error("Failed to process payment {}: {}", event.getId(), e.getMessage());
            // Consumer вернёт сообщение в очередь для retry
            throw new RuntimeException("Payment processing failed", e);
        }
    }
}

Конфигурация для high reliability

@Configuration
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, PaymentEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Лучшие практики

  • Всегда используй at-least-once режим (default)
  • Обработка должна быть idempotent
  • Используй unique constraints на базе данных
  • Логируй обработку для отладки дубликатов
  • Тестируй сценарии сбоев
  • Мониторь дубликаты в production

Вывод

Дубликаты в Kafka - это нормальное явление. Правильная архитектура должна предусмотреть idempotent обработку сообщений независимо от количества попыток.

Можно ли в Kafka прочитать одно и то же сообщение два раза? | PrepBro