← Назад к вопросам

Как распределяются данные в Kafka

1.3 Junior🔥 181 комментариев
#Коллекции

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Распределение данных в Apache Kafka

Кафка распределяет данные (сообщения) используя концепции Topics, Partitions, и Consumer Groups. Это позволяет достичь высокой масштабируемости и параллелизма.

Основная архитектура

Topic — логический канал для сообщений. Каждый Topic разделен на несколько Partitions:

Topic "orders"
┌─────────────────────────────────────────────────────┐
│                                                     │
│  Partition 0        Partition 1        Partition 2 │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────┐│
│  │ msg1 (key:1) │   │ msg3 (key:2) │   │ msg5 ... ││
│  │ msg2 (key:1) │   │ msg4 (key:2) │   │ ...      ││
│  │ ...          │   │ ...          │   │          ││
│  └──────────────┘   └──────────────┘   └──────────┘│
│   Offset: 0,1,2      Offset: 0,1        Offset: 0  │
└─────────────────────────────────────────────────────┘

Как сообщения распределяются в Partitions

Каждое сообщение отправляется в конкретный partition основываясь на ключе (key):

Процесс распределения:

1. Если ключ указан:
   key → hash(key) mod num_partitions → определяет partition
   
2. Если ключ NULL:
   round-robin distribution или sticky assignment
   (зависит от конфигурации)

Пример с Java:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Одинаковые ключи идут в один partition
producer.send(new ProducerRecord<>("orders", "customer-123", "order-1"));
producer.send(new ProducerRecord<>("orders", "customer-123", "order-2"));
// Оба сообщения в одном partition → гарантирован порядок

producer.send(new ProducerRecord<>("orders", "customer-456", "order-3"));
// Может быть в другом partition

producer.close();

Хэширование ключа

// Алгоритм по умолчанию (Murmur2)
int partition = Math.abs(key.hashCode()) % numPartitions;

// Пример:
key = "customer-123"
hash("customer-123") = 1234567  (пример)
1234567 % 3 partitions = 1  → Partition 1

key = "customer-456"
hash("customer-456") = 9876543
9876543 % 3 partitions = 0  → Partition 0

key = "customer-789"
hash("customer-789") = 5555555
5555555 % 3 partitions = 2  → Partition 2

Consumer Groups — параллельная обработка

Consumer Group — группа потребителей которые вместе обрабатывают Topic:

Topic "orders" с 3 partitions

Consumer Group 1 (заказы):
┌────────────────────────────────────────────────────┐
│ Consumer-1       Consumer-2       Consumer-3       │
│   ↓                ↓                ↓              │
│ Part-0           Part-1           Part-2          │
└────────────────────────────────────────────────────┘

Каждый consumer читает из ОДНОГО partition
Порядок сообщений ГАРАНТИРОВАН внутри partition

Правило распределения:

  • Если consumers < partitions → некоторые consumers читают из нескольких partitions
  • Если consumers = partitions → один consumer на один partition (оптимально)
  • Если consumers > partitions → некоторые consumers будут неактивны

Пример: Consumer Group в Java

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "orders-processing"); // Группа
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // С начала если нет offset

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Consumer.subscribe(Arrays.asList("orders")); // Подписываемся на Topic

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf(
            "Partition: %d, Offset: %d, Key: %s, Value: %s%n",
            record.partition(),
            record.offset(),
            record.key(),
            record.value()
        );
        
        // Обработка сообщения
        processOrder(record.value());
    }
    
    consumer.commitSync(); // Сохраняем offset
}

Offset — отслеживание позиции

Offset — индекс позиции сообщения в partition:

Partition 0:
┌─────────────┬─────────────┬─────────────┬─────────────┐
│  Offset 0   │  Offset 1   │  Offset 2   │  Offset 3   │
│  message1   │  message2   │  message3   │  message4   │
└─────────────┴─────────────┴─────────────┴─────────────┘
              ↑
         Consumer read from offset 1
         (прочитает сообщения 1, 2, 3, ...)

Commit offset — сохранение прогресса:

// Auto commit (опасно — может потеряться сообщение)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// Manual commit (безопасно)
props.put("enable.auto.commit", "false");
// ...
consumer.commitSync(); // Синхронный commit
consumer.commitAsync(); // Асинхронный commit

Гарантии доставки

// Producer acks configuration
props.put("acks", "all"); // Ждем пока все replicas запишут
// acks=0: не ждем ответа (быстро, риск потери)
// acks=1: ждем ответа от leader (среднее)
// acks=all: ждем всех replicas (медленно, надежно)

props.put("retries", 3); // Повторные попытки
props.put("max.in.flight.requests.per.connection", 1); // Порядок сообщений

Rebalancing — переераспределение при масштабировании

Начальное состояние:
Consumers: [C1, C2, C3]
Partitions: [P0, P1, P2]
Пораспределение: C1→P0, C2→P1, C3→P2

Добавляем Consumer C4:
↓ Rebalance начинается
Новое распределение: C1→P0, C2→P1, C3→P2, C4→ (никуда)

Удаляем Consumer C3:
↓ Rebalance начинается
Новое распределение: C1→P0,P2, C2→P1, C4→ (никуда)

Во время Rebalance:

  • Обработка сообщений ОСТАНАВЛИВАЕТСЯ
  • Consumer Group перебалансируется
  • Это занимает время и влияет на throughput

Лучшие практики

  1. Количество partitions должно быть ≥ количеству consumers
  2. Используй key для обеспечения порядка связанных сообщений
  3. Обрабатывай idempotently (одно сообщение несколько раз) из-за rebalancing
  4. Коммитай offset после успешной обработки
  5. Настраивай batch.size и linger.ms для увеличения throughput
  6. Мониторь consumer lag — отставание от producer
// Проверка lag
consumer.assignment().forEach(partition -> {
    long committed = consumer.committed(partition).offset();
    long position = consumer.position(partition);
    System.out.println("Lag: " + (position - committed));
});

Итог

Кафка масштабируется используя:

  • Partitions для параллелизма внутри topic
  • Consumer Groups для параллельной обработки
  • Keys для гарантии порядка
  • Offsets для отслеживания прогресса

Это позволяет обрабатывать миллионы сообщений в секунду с гарантиями доставки.