← Назад к вопросам

Как спроектировать приложение Kafka для обработки сообщения строго один раз

2.4 Senior🔥 81 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Exactly-Once Semantics (EOS) в Kafka: проектирование надёжного приложения

Обработка сообщений строго один раз (exactly-once) — одна из самых сложных задач в распределённых системах. Kafka предоставляет встроенные механизмы, но требует правильной архитектуры приложения.

Уровни гарантирования доставки

┌─────────────────────────────────────────────────────┐
│ At-Most-Once (Максимум раз)                          │
│ - Сообщение может быть потеряно                      │
│ - Быстро, но ненадёжно                               │
│ - Используется: аналитика, логирование               │
├─────────────────────────────────────────────────────┤
│ At-Least-Once (Минимум раз)                          │
│ - Сообщение может быть обработано несколько раз      │
│ - Требует идемпотентности                            │
│ - Используется: операции с БД                        │
├─────────────────────────────────────────────────────┤
│ Exactly-Once (Строго один раз) ✓                     │
│ - Сообщение обработано ровно один раз                │
│ - Самое надёжное и сложное                           │
│ - Требует специальной архитектуры                    │
└─────────────────────────────────────────────────────┘

Архитектурный подход

Для достижения exactly-once нужны три компонента:

  1. Идемпотентный Producer — отправитель
  2. Трансакционная обработка — потребитель
  3. Дедупликация — удаление дубликатов

Решение 1: Producer-side Exactly-Once

Идемпотентный Producer

public class IdempotentProducer {
    private final KafkaTemplate<String, Order> kafkaTemplate;
    
    public void sendOrderIdempotent(Order order) {
        // Генерируем уникальный ID для сообщения
        String messageId = UUID.randomUUID().toString();
        
        ProducerRecord<String, Order> record = new ProducerRecord<>(
            "orders",
            order.getId().toString(),  // Key (партиционирование)
            order
        );
        
        // Добавляем ID в headers для дедупликации
        record.headers().add("message-id", messageId.getBytes());
        record.headers().add("timestamp", System.currentTimeMillis() + "".getBytes());
        
        kafkaTemplate.send(record);
    }
}

@Configuration
public class IdempotentProducerConfig {
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // КРИТИЧНЫЕ ПАРАМЕТРЫ ДЛЯ ИДЕМПОТЕНТНОСТИ:
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        return new DefaultProducerFactory<>(configProps);
    }
}

Решение 2: Consumer-side Exactly-Once с Transactions

Это основной механизм. Используем транзакции Kafka и БД вместе:

@Service
public class TransactionalOrderConsumer {
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private ConsumerOffsetRepository consumerOffsetRepository;
    
    @KafkaListener(
        topics = "orders",
        groupId = "exactly-once-group",
        properties = {
            "isolation.level=read_committed",
            "enable.auto.commit=false"
        }
    )
    public void consumeWithTransactions(
            ConsumerRecord<String, Order> record,
            Acknowledgment acknowledgment) {
        
        // Используем @Transactional для синхронизации с БД
        processOrderInTransaction(record, acknowledgment);
    }
    
    @Transactional
    private void processOrderInTransaction(
            ConsumerRecord<String, Order> record,
            Acknowledgment acknowledgment) {
        
        Order order = record.value();
        String messageId = extractMessageId(record);
        long offset = record.offset();
        int partition = record.partition();
        
        try {
            // 1. Проверяем, не обработали ли мы это сообщение
            if (orderRepository.existsByMessageId(messageId)) {
                // Сообщение уже обработано - пропускаем
                acknowledgeAndCommit(record, acknowledgment, offset, partition);
                return;
            }
            
            // 2. Обрабатываем заказ
            Order processedOrder = processOrder(order);
            
            // 3. Сохраняем в БД с сохранением messageId
            processedOrder.setMessageId(messageId);
            orderRepository.save(processedOrder);
            
            // 4. Сохраняем offset в ОДНОЙ транзакции с данными
            consumerOffsetRepository.save(new ConsumerOffset(
                "orders",
                partition,
                offset,
                System.currentTimeMillis()
            ));
            
            // 5. Коммитим offset (синхронизировано с БД)
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            // Транзакция откатится автоматически
            log.error("Ошибка обработки заказа: " + messageId, e);
            // Не коммитим - сообщение будет переобработано
        }
    }
    
    private void acknowledgeAndCommit(
            ConsumerRecord<String, Order> record,
            Acknowledgment acknowledgment,
            long offset,
            int partition) {
        acknowledgment.acknowledge();
        // В контексте транзакции это безопасно
    }
}

Решение 3: Kafka Streams для Exactly-Once

Кафка Streams имеет встроенную поддержку exactly-once:

@Configuration
public class ExactlyOnceStreamsConfig {
    @Bean
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // КЛЮЧЕВОЙ ПАРАМЕТР ДЛЯ EXACTLY-ONCE:
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
                   StreamsConfig.EXACTLY_ONCE_V2);
        
        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 86400000);
        
        StreamsBuilderFactoryBean factoryBean = new StreamsBuilderFactoryBean();
        factoryBean.setStreamsConfiguration(config);
        return factoryBean;
    }
    
    @Bean
    public KStream<String, Order> ordersStream(StreamsBuilder builder) {
        KStream<String, Order> orders = builder.stream(
            "orders",
            Consumed.with(
                Serdes.String(),
                new JsonSerde<>(Order.class)
            )
        );
        
        orders
            .peek((key, value) -> log.info("Processing: {} -> {}", key, value.getId()))
            .mapValues(this::validateAndProcessOrder)
            .to("processed-orders", 
                Produced.with(
                    Serdes.String(),
                    new JsonSerde<>(ProcessedOrder.class)
                )
            );
        
        return orders;
    }
}

Решение 4: Дедупликация на основе БД

Хранём обработанные сообщения для защиты от дубликатов:

@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
    @Id
    private String messageId;
    
    private String topicName;
    private int partitionNumber;
    private long offset;
    private LocalDateTime processedAt;
    
    @Index(unique = true)
    @Column(nullable = false)
    private String idempotencyKey;  // UUID + timestamp
}

@Repository
public interface ProcessedMessageRepository 
        extends JpaRepository<ProcessedMessage, String> {
    Optional<ProcessedMessage> findByIdempotencyKey(String key);
}

@Service
public class DeduplicationService {
    @Autowired
    private ProcessedMessageRepository processedMessageRepository;
    
    @Transactional
    public <T> T processWithDeduplication(
            String idempotencyKey,
            Function<String, T> processor) {
        
        // Проверяем, не обработано ли
        Optional<ProcessedMessage> existing = 
            processedMessageRepository.findByIdempotencyKey(idempotencyKey);
        
        if (existing.isPresent()) {
            log.warn("Сообщение {} уже обработано в {}", 
                     idempotencyKey, existing.get().getProcessedAt());
            return null;  // или вернуть закэшированный результат
        }
        
        // Обрабатываем
        T result = processor.apply(idempotencyKey);
        
        // Записываем как обработанное
        ProcessedMessage message = new ProcessedMessage();
        message.setIdempotencyKey(idempotencyKey);
        message.setProcessedAt(LocalDateTime.now());
        processedMessageRepository.save(message);
        
        return result;
    }
}

Решение 5: Полная архитектура с трансакциями

@Configuration
public class FullyTransactionalKafkaConfig {
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "exactly-once-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // Читаем только committed сообщения
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        // Ручной offset management в транзакции
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        return new DefaultConsumerFactory<>(props);
    }
    
    @Bean
    public KafkaTransactionManager kafkaTransactionManager(
            ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager(producerFactory);
    }
}

@Service
public class ExactlyOnceOrderService {
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Transactional  // Синхронизирует Kafka и БД транзакции
    public void processOrderExactlyOnce(ConsumerRecord<String, Order> record) {
        Order order = record.value();
        String messageId = extractMessageId(record);
        
        try {
            // Если уже есть - не обработаем
            if (orderRepository.findByMessageId(messageId).isPresent()) {
                return;
            }
            
            // Обрабатываем
            Order processed = new Order();
            processed.setId(order.getId());
            processed.setMessageId(messageId);
            processed.setStatus("PROCESSED");
            processed.setProcessedAt(LocalDateTime.now());
            
            orderRepository.save(processed);
            
            // Отправляем в следующий топик ВО ВРЕМЯ ТРАНЗАКЦИИ
            kafkaTemplate.send("processed-orders", processed.getId(), processed);
            
            // Транзакция покроет обе операции
            
        } catch (Exception e) {
            // Откат обеих операций
            throw new RuntimeException("Ошибка обработки", e);
        }
    }
}

Чеклист для Exactly-Once

Producer:

  • Enable idempotence
  • Acks = "all"
  • Добавлять message-id в headers
  • Retries > 0

Consumer:

  • Enable auto commit = false
  • Isolation level = read_committed
  • Обрабатывать в транзакции
  • Проверять дубликаты перед обработкой
  • Коммитить offset после успеха

База данных:

  • Unique constraint на messageId
  • Одна транзакция для данных + offset
  • Retry логика при deadlock

Мониторинг:

  • Логировать все обработки
  • Alerting на durable сообщения
  • Метрики обработки и переобработки

Этот подход гарантирует, что каждое сообщение будет обработано ровно один раз, что критично для финансовых операций и заказов!