← Назад к вопросам

Можно ли с Topic распределенным по трем позициям, прочитать сообщения в порядке записи?

2.0 Middle🔥 111 комментариев
#Docker, Kubernetes и DevOps#JVM и управление памятью#ORM и Hibernate

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Kafka: Порядок сообщений при репликации на несколько разделов

Нет, вы не сможете гарантировать глобальный порядок сообщений при использовании топика с несколькими разделами (партициями). Это одно из основных компромиссов в распределённых системах обмена сообщениями.

Проблема: три раздела (партиции)

Когда топик распределён на 3 партиции, Kafka распределяет сообщения между ними:

Производитель пишет сообщения:
Message 1 → Partition 0
Message 2 → Partition 1
Message 3 → Partition 2
Message 4 → Partition 0
Message 5 → Partition 1
Message 6 → Partition 2

Когда потребитель читает из всех 3 партиций одновременно, он может получить сообщения в любом порядке:

Потребитель может прочитать:
Partition 0: [Message 1, Message 4]
Partition 1: [Message 2, Message 5]
Partition 2: [Message 3, Message 6]

❌ Порядок может быть: 1, 2, 5, 3, 4, 6 (не исходный!)

Почему это происходит

Причина 1: Асинхронная репликация

Кафка использует лидер-фолловер модель. Лидер получает все записи, а фолловеры реплицируют асинхронно. Из-за этого разные партиции могут быть в разных состояниях.

// Kafka архитектура (упрощённо):
// Broker 1 (Leader P0): [M1, M4]
// Broker 2 (Leader P1): [M2, M5]
// Broker 3 (Leader P2): [M3, M6]
//
// Фолловеры могут не иметь все сообщения мгновенно!

Причина 2: Параллельное чтение из разных партиций

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // ❌ Сообщения из разных партиций!
                // Порядок НЕ гарантирован
                System.out.println("Partition: " + record.partition() + 
                                   ", Offset: " + record.offset() + 
                                   ", Value: " + record.value());
            }
        }
    }
}

Решение 1: Единственный раздел (Partition)

Если вам нужен глобальный порядок, используйте 1 раздел:

public class OrderedProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        for (int i = 1; i <= 6; i++) {
            // Использование одного и того же ключа гарантирует отправку в один раздел
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("ordered-topic", "key", "Message " + i);
            producer.send(record);
        }
        
        producer.close();
    }
}

// Топик должен иметь num.partitions = 1

Потребитель получит в исходном порядке:

Message 1
Message 2
Message 3
Message 4
Message 5
Message 6

Решение 2: Ключи сообщений (Partitioning by Key)

Для каждого логического потока используйте свой ключ:

public class KeyBasedProducer {
    public static void main(String[] args) {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Все сообщения пользователя 1 пойдут в один раздел
        producer.send(new ProducerRecord<>("users-topic", "user-1", "Action 1"));
        producer.send(new ProducerRecord<>("users-topic", "user-1", "Action 2"));
        producer.send(new ProducerRecord<>("users-topic", "user-1", "Action 3"));
        
        // Все сообщения пользователя 2 пойдут в ДРУГОЙ раздел
        producer.send(new ProducerRecord<>("users-topic", "user-2", "Action 1"));
        producer.send(new ProducerRecord<>("users-topic", "user-2", "Action 2"));
        
        producer.close();
    }
}

Архитектура:

Partition 0 (user-1): [Action 1, Action 2, Action 3]
Partition 1 (user-2): [Action 1, Action 2]
Partition 2 (user-3): [...]

Потребитель:

public class KeyAwareConsumer {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("users-topic"));
        
        Map<String, List<String>> userActions = new TreeMap<>();
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            
            for (ConsumerRecord<String, String> record : records) {
                String userId = record.key();
                String action = record.value();
                
                // Группируем по пользователю
                userActions.computeIfAbsent(userId, k -> new ArrayList<>())
                           .add(action);
            }
        }
    }
}

Решение 3: Обработка порядка на уровне приложения

Если нужна абсолютная упорядоченность, используйте состояние с последовательностью:

public class Message {
    private long sequenceNumber;  // Глобальный номер последовательности
    private String content;
    private long timestamp;
}

public class OrderedConsumer {
    private long expectedSequence = 0;
    private PriorityQueue<Message> outOfOrderBuffer = new PriorityQueue<>(
        (m1, m2) -> Long.compare(m1.getSequenceNumber(), m2.getSequenceNumber())
    );
    
    public void handleMessage(Message msg) {
        if (msg.getSequenceNumber() == expectedSequence) {
            processMessage(msg);
            expectedSequence++;
            
            // Обработай из буфера, если есть
            while (!outOfOrderBuffer.isEmpty() && 
                   outOfOrderBuffer.peek().getSequenceNumber() == expectedSequence) {
                processMessage(outOfOrderBuffer.poll());
                expectedSequence++;
            }
        } else if (msg.getSequenceNumber() > expectedSequence) {
            outOfOrderBuffer.offer(msg);  // Ждём пропущенного
        }
        // Игнорируем дубликаты (sequence < expectedSequence)
    }
    
    private void processMessage(Message msg) {
        System.out.println("Processing: " + msg.getSequenceNumber());
    }
}

Сравнение подходов

ПодходПроизводительностьСложностьГарантии
1 раздел❌ Низкая✅ Простой✅ Глобальный порядок
Ключи✅ Высокая✅ Простой✅ Порядок по ключу
Буфер на приложении❌ Низкая❌ Сложный✅ Глобальный порядок

Вывод

  • Нельзя гарантировать порядок с 3 разделами при параллельном чтении
  • Используй 1 раздел для глобального порядка (масштабируется плохо)
  • Используй ключи для порядка внутри группы данных (рекомендуется)
  • Добавь последовательность на уровне приложения, если нужна абсолютная упорядоченность

Это архитектурное компромисс: производительность vs порядок

Можно ли с Topic распределенным по трем позициям, прочитать сообщения в порядке записи? | PrepBro