← Назад к вопросам

Как гарантируется порядок сообщений в Kafka

3.0 Senior🔥 251 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Как гарантируется порядок сообщений в Kafka

Краткий ответ

Кафка гарантирует порядок сообщений внутри одной партиции, но НЕ гарантирует порядок между партициями.

Архитектура Kafka

Topic: orders
├── Partition 0 → [msg1, msg2, msg3, msg4]
├── Partition 1 → [msg5, msg6, msg7]
└── Partition 2 → [msg8, msg9]

Порядок гарантирован внутри каждой партиции
НО НЕ между партициями

Как гарантируется порядок в партиции

1. Одноточечный вход для каждого потока

Кафка назначает сообщение в партицию на основе partition key:

ProducerRecord<String, String> record = 
    new ProducerRecord<>("orders", "user-123", "order:data");
    //                   topic    partition-key    value

// Все сообщения с одним partition key идут в одну партицию
// → порядок гарантирован!
user-123 → partition 2 ← все сообщения этого пользователя
user-456 → partition 0 ← все сообщения этого пользователя
user-789 → partition 1 ← все сообщения этого пользователя

Каждая партиция обрабатывает сообщения в порядке поступления

2. Внутри партиции: порядок гарантирован

Partition 0:

Offset 0: user-456 order-1 (time: 10:00)
Offset 1: user-456 order-2 (time: 10:01) ← гарантировано после order-1
Offset 2: user-456 order-3 (time: 10:02) ← гарантировано после order-2
Offset 3: user-456 order-4 (time: 10:03) ← гарантировано после order-3

Как это работает: Producer

public class OrderProducer {
    
    private final KafkaTemplate<String, Order> kafkaTemplate;
    
    @Autowired
    public OrderProducer(KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void publishOrder(String userId, Order order) {
        // partition key - это userId
        // Все заказы одного пользователя идут в одну партицию
        kafkaTemplate.send(
            new ProducerRecord<>(
                "orders",           // topic
                userId,             // partition key (ВАЖНО!)
                order               // value
            )
        );
        
        // Результат:
        // Все заказы user-123 в одной партиции
        // → порядок гарантирован
    }
}

// Использование
OrderProducer producer = new OrderProducer(kafkaTemplate);
producer.publishOrder("user-123", new Order(1000));
producer.publishOrder("user-123", new Order(2000));
producer.publishOrder("user-123", new Order(3000));

// Все три заказа попадут в одну партицию
// И будут обработаны в порядке: 1000 → 2000 → 3000

Как это работает: Consumer

@Configuration
public class KafkaConsumerConfig {
    // Consumer Group - гарантирует обработку в порядке
}

@Service
public class OrderConsumer {
    
    @KafkaListener(topics = "orders", groupId = "order-service-group")
    public void consumeOrder(Order order, 
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                             @Header(KafkaHeaders.OFFSET) long offset) {
        
        System.out.println(String.format(
            "Processing order from partition=%d, offset=%d",
            partition, offset
        ));
        
        // Сообщения из одной партиции обрабатываются
        // одним потоком Consumer Group
        // → порядок гарантирован
        
        processOrder(order);
    }
}

// Результат:
// Partition 0 → обрабатывает 1 consumer из группы (в порядке)
// Partition 1 → обрабатывает 1 consumer из группы (в порядке)
// Partition 2 → обрабатывает 1 consumer из группы (в порядке)

Правило: Consumer Group и Partitions

Тема: orders (3 партиции)
Consumer Group: order-service-group

Сценарий 1: 1 Consumer
┌─────────────────────────────────┐
│ Consumer 1                       │
│ ├─ Partition 0 ← обрабатывает   │
│ ├─ Partition 1 ← обрабатывает   │
│ └─ Partition 2 ← обрабатывает   │
└─────────────────────────────────┘

Порядок внутри каждой партиции гарантирован
НО может быть чередование между партициями

Сценарий 2: 3 Consumers
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ Consumer 1   │  │ Consumer 2   │  │ Consumer 3   │
│ Partition 0  │  │ Partition 1  │  │ Partition 2  │
└──────────────┘  └──────────────┘  └──────────────┘

Каждый consumer обрабатывает одну партицию
Порядок гарантирован для каждого consumer

Сценарий 3: 5 Consumers
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ Consumer 1   │  │ Consumer 2   │  │ Consumer 3   │
│ Partition 0  │  │ Partition 1  │  │ Partition 2  │
└──────────────┘  └──────────────┘  └──────────────┘

┌──────────────┐  ┌──────────────┐
│ Consumer 4   │  │ Consumer 5   │
│    idle      │  │    idle      │
└──────────────┘  └──────────────┘

Consumer 4 и 5 ничего не делают (нет партиций)

Гарантии порядка по типам

At-Most-Once (может быть потеря)

// producer acks=0 (не ждёт confirmation)
properties.put("acks", "0");

// Быстро, но может быть потеря сообщений
// Порядок может нарушиться при retry

At-Least-Once (может быть дублирование)

properties.put("acks", "1"); // или "all"
properties.put("retries", Integer.MAX_VALUE);

// Медленнее, но гарантирует доставку
// Может быть дублирование
// Но порядок сохраняется благодаря retries

Exactly-Once (идеально)

properties.put("acks", "all");
properties.put("retries", Integer.MAX_VALUE);
properties.put("enable.idempotence", true); // Ключевой параметр!
properties.put("max.in.flight.requests.per.connection", 5);

// Гарантирует ровно одну доставку каждого сообщения
// Порядок сохраняется

Практический пример: Transfer Money

public class MoneyTransferService {
    
    private final KafkaTemplate<String, TransferEvent> kafkaTemplate;
    
    @Autowired
    public MoneyTransferService(
            KafkaTemplate<String, TransferEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void transferMoney(String userId, BigDecimal amount, String reason) {
        TransferEvent event = new TransferEvent(
            userId, amount, reason, System.currentTimeMillis()
        );
        
        // КЛЮЧЕВОЙ МОМЕНТ: partition key = userId
        // Все переводы одного пользователя в одной партиции
        kafkaTemplate.send(
            new ProducerRecord<>(
                "transfer-events",
                userId,             // partition key
                event
            )
        );
    }
}

@Service
public class TransferEventConsumer {
    
    private final AccountService accountService;
    
    @Autowired
    public TransferEventConsumer(AccountService accountService) {
        this.accountService = accountService;
    }
    
    @KafkaListener(
        topics = "transfer-events",
        groupId = "transfer-processor",
        concurrency = "3" // 3 потока, но каждый обрабатывает свою партицию
    )
    public void processTransfer(TransferEvent event) {
        // Благодаря partition key = userId:
        // - Все переводы user-123 обрабатываются в порядке
        // - Можно безопасно обновлять баланс
        // - Нет race conditions для одного пользователя
        
        accountService.processTransfer(event);
    }
}

// Сценарий без partition key (ОПАСНО!):
kafkaTemplate.send(
    new ProducerRecord<>("transfer-events", event)
    // partition key не указан!
);

// Результат:
// event1 → partition 0
// event2 → partition 1  ← другой consumer!
// event3 → partition 2  ← ещё другой consumer!

// Разные потоки могут обновлять баланс одновременно
// → race condition!

Важные моменты

1. Порядок гарантирован ТОЛЬКО внутри партиции

НЕ ГАРАНТИРУЕТ порядок между партициями:

Partition 0: user-123 order at 10:00
Partition 1: user-456 order at 09:59 ← может быть обработана ПЕРВОЙ
Partition 2: user-789 order at 10:01

Порядок обработки может быть:
1. user-456 (из partition 1)
2. user-123 (из partition 0)
3. user-789 (из partition 2)

2. Выбор partition key критичен

// ✓ ПРАВИЛЬНО - по пользователю
kafkaTemplate.send(
    new ProducerRecord<>("orders", userId, order)
);
// Все заказы одного пользователя в порядке

// ❌ НЕПРАВИЛЬНО - без partition key
kafkaTemplate.send(
    new ProducerRecord<>("orders", order)
);
// Порядок не гарантирован

// ❌ НЕПРАВИЛЬНО - случайный partition key
kafkaTemplate.send(
    new ProducerRecord<>("orders", UUID.randomUUID().toString(), order)
);
// Каждое сообщение в разных партициях
// Порядок потеряется

3. Consumer Group обеспечивает масштабируемость

Eсли добавить нового consumer:

ДО: Consumer 1 обрабатывает все 3 партиции
ПОСЛЕ: Consumer 1 обрабатывает partition 0, 1
       Consumer 2 обрабатывает partition 2

Порядок в каждой партиции сохраняется
Но теперь обработка параллельна

Конфигурация для порядка

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        
        // Гарантия порядка
        configProps.put("acks", "all"); // Ждём confirmation от всех replicas
        configProps.put("retries", Integer.MAX_VALUE); // Retry при ошибке
        configProps.put("enable.idempotence", true); // Exactly-once
        configProps.put("max.in.flight.requests.per.connection", 5);
        
        // Производительность
        configProps.put("batch.size", 16384);
        configProps.put("linger.ms", 10);
        configProps.put("compression.type", "snappy");
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Order> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

@Configuration
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        
        configProps.put("bootstrap.servers", "localhost:9092");
        configProps.put("group.id", "order-service");
        configProps.put("key.deserializer", 
            StringDeserializer.class);
        configProps.put("value.deserializer", 
            JsonDeserializer.class);
        
        // Обработка в порядке
        configProps.put("enable.auto.commit", false); // Manual commit
        configProps.put("auto.offset.reset", "earliest"); // С начала при потере
        
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order>
            kafkaListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        factory.getContainerProperties().setAckMode(
            ContainerProperties.AckMode.MANUAL_IMMEDIATE
        );
        
        return factory;
    }
}

Заключение

Кафка гарантирует порядок сообщений через:

  1. Partition Key - все сообщения с одним ключом в одну партицию
  2. Sequential Processing - каждая партиция обрабатывается последовательно
  3. Consumer Group - масштабируемость с сохранением порядка
  4. Offsets - координация и восстановление позиции

Правило: Выбирай partition key так, чтобы все сообщения, которые должны быть упорядочены, попали в одну партицию.