← Назад к вопросам

Как сделать так, чтобы Consumer получил сообщения в порядке, в котором их отправляют в Kafka

2.0 Middle🔥 101 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Гарантирование порядка сообщений в Kafka: полный гайд

Кафка — это распределённая система, и гарантирование порядка доставки — критически важно для многих приложений. Рассмотрю все аспекты: от архитектуры Kafka до конфигурации Java клиента.

Основной принцип: одна партиция = один порядок

Ключевой момент: Kafka гарантирует порядок сообщений только в рамках одной партиции. Если тема разделена на несколько партиций, разные потребители могут обрабатывать их параллельно.

Тема: orders [3 партиции]
┌─────────────────────────┐
│ Partition 0 │ Partition 1 │ Partition 2 │
│ msg1, msg3  │ msg2, msg5  │ msg4, msg6  │
│ (заказ А)   │ (заказ Б)   │ (заказ В)   │
└─────────────────────────┘

Порядок гарантирован в рамках каждой партиции!

Решение 1: Использование Key для маршрутизации

Отправитель (Producer)

Так как вы хотите гарантировать порядок — используйте один ключ для связанных сообщений:

public class OrderProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendOrder(Order order) {
        // Ключ: ID пользователя или заказа
        // Все сообщения с одинаковым ключом идут в одну партицию
        kafkaTemplate.send(
            "orders",
            order.getUserId(),  // <- KEY (определяет партицию)
            order.toString()    // <- VALUE
        );
    }
}

// Или напрямую с KafkaProducer
public class LowLevelProducer {
    public void sendOrderMessage(Order order) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");  // Дождаться подтверждения всех реплик
        props.put("retries", 3);
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "orders",
                order.getUserId(),  // KEY
                order.toJson()      // VALUE
            );
            
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Ошибка отправки: " + exception.getMessage());
                } else {
                    System.out.println("Отправлено в partition: " + metadata.partition());
                }
            });
        }
    }
}

Как работает маршрутизация по ключу

Кафка использует хеш ключа для определения партиции:

int partition = Math.abs(key.hashCode()) % numPartitions;
// Один и тот же ключ ВСЕГДА идёт в одну и ту же партицию!

Решение 2: Конфигурация Consumer для корректной обработки

Spring Kafka

@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
        
        // КРИТИЧНЫЕ ПАРАМЕТРЫ ДЛЯ ПОРЯДКА:
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);  // Обрабатывай по одному
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Ручной коммит
        
        return new DefaultConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConcurrency(1);  // ОДН ПОТОК НА ПАРТИЦИЮ!
        return factory;
    }
}

@Service
public class OrderConsumer {
    @KafkaListener(topics = "orders", groupId = "order-group")
    public void consumeOrder(String message, Acknowledgment acknowledgment) {
        try {
            Order order = parseOrder(message);
            processOrderInOrder(order);
            // Коммитим только после успешной обработки
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // При ошибке сообщение будет переобработано
            System.err.println("Ошибка обработки: " + e.getMessage());
        }
    }
}

Native KafkaConsumer

public class OrderConsumerManual {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "order-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        // ВАЖНЫЕ НАСТРОЙКИ:
        props.put("isolation.level", "read_committed");  // Читать только committed сообщения
        props.put("enable.auto.commit", false);  // Ручной коммит
        props.put("max.poll.records", 1);  // По одному сообщению
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(List.of("orders"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // Обработка сообщения
                        System.out.println("Ключ: " + record.key() + 
                                         ", Партиция: " + record.partition() + 
                                         ", Offset: " + record.offset());
                        processOrder(record.value());
                        
                        // Коммитим offset после успешной обработки
                        consumer.commitSync();
                    } catch (Exception e) {
                        System.err.println("Ошибка: " + e.getMessage());
                        // Не коммитим - будет переобработано
                    }
                }
            }
        }
    }
}

Решение 3: Один Consumer на партицию

Если нужна максимальная гарантия порядка — используй одного consumer на партицию:

@Service
public class SingleThreadOrderConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    
    @KafkaListener(
        topics = "orders",
        groupId = "single-consumer-group",
        concurrency = "1"  // Один поток на ВСЕ партиции
    )
    public void consumeInOrder(List<ConsumerRecord<String, Order>> records) {
        // Все сообщения обрабатываются последовательно
        for (ConsumerRecord<String, Order> record : records) {
            try {
                processOrder(record.value());
            } catch (Exception e) {
                handleError(e);
            }
        }
    }
}

Решение 4: Обработка с гарантией доставки

@Service
public class RobustOrderConsumer {
    @KafkaListener(topics = "orders", groupId = "robust-group")
    public void processWithRetry(Order order, Acknowledgment ack) {
        int retries = 3;
        int attempt = 0;
        
        while (attempt < retries) {
            try {
                // Обработка в БД с транзакцией
                saveOrderToDatabase(order);
                ack.acknowledge();  // Коммит только при успехе
                return;
            } catch (Exception e) {
                attempt++;
                if (attempt >= retries) {
                    // Отправить в DLQ (Dead Letter Queue)
                    sendToDeadLetterQueue(order, e);
                    ack.acknowledge();
                    return;
                }
                
                // Экспоненциальная задержка перед повтором
                try {
                    Thread.sleep((long) Math.pow(2, attempt) * 1000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

Правила для гарантирования порядка

ПараметрЗначениеПочему
KeyuserId/orderIdОдин ключ = одна партиция
Acks"all"Дождаться всех реплик
Enable auto commitfalseКоммитим после успеха
Concurrency1Один поток на группу
Retries> 0Переотправка при ошибке
Isolation levelread_committedНе читаем незакомманченные
Max poll records1Обрабатываем по одному

Архитектура для максимальной надёжности

@Configuration
public class ReliableKafkaConfig {
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");  // Все реплики
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);  // ВАЖНО!
        return new DefaultProducerFactory<>(configProps);
    }
}

Эта конфигурация гарантирует, что сообщения всегда доходят и обрабатываются в правильном порядке!