Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Распределение данных в Apache Kafka
Кафка распределяет данные (сообщения) используя концепции Topics, Partitions, и Consumer Groups. Это позволяет достичь высокой масштабируемости и параллелизма.
Основная архитектура
Topic — логический канал для сообщений. Каждый Topic разделен на несколько Partitions:
Topic "orders"
┌─────────────────────────────────────────────────────┐
│ │
│ Partition 0 Partition 1 Partition 2 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────┐│
│ │ msg1 (key:1) │ │ msg3 (key:2) │ │ msg5 ... ││
│ │ msg2 (key:1) │ │ msg4 (key:2) │ │ ... ││
│ │ ... │ │ ... │ │ ││
│ └──────────────┘ └──────────────┘ └──────────┘│
│ Offset: 0,1,2 Offset: 0,1 Offset: 0 │
└─────────────────────────────────────────────────────┘
Как сообщения распределяются в Partitions
Каждое сообщение отправляется в конкретный partition основываясь на ключе (key):
Процесс распределения:
1. Если ключ указан:
key → hash(key) mod num_partitions → определяет partition
2. Если ключ NULL:
round-robin distribution или sticky assignment
(зависит от конфигурации)
Пример с Java:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Одинаковые ключи идут в один partition
producer.send(new ProducerRecord<>("orders", "customer-123", "order-1"));
producer.send(new ProducerRecord<>("orders", "customer-123", "order-2"));
// Оба сообщения в одном partition → гарантирован порядок
producer.send(new ProducerRecord<>("orders", "customer-456", "order-3"));
// Может быть в другом partition
producer.close();
Хэширование ключа
// Алгоритм по умолчанию (Murmur2)
int partition = Math.abs(key.hashCode()) % numPartitions;
// Пример:
key = "customer-123"
hash("customer-123") = 1234567 (пример)
1234567 % 3 partitions = 1 → Partition 1
key = "customer-456"
hash("customer-456") = 9876543
9876543 % 3 partitions = 0 → Partition 0
key = "customer-789"
hash("customer-789") = 5555555
5555555 % 3 partitions = 2 → Partition 2
Consumer Groups — параллельная обработка
Consumer Group — группа потребителей которые вместе обрабатывают Topic:
Topic "orders" с 3 partitions
Consumer Group 1 (заказы):
┌────────────────────────────────────────────────────┐
│ Consumer-1 Consumer-2 Consumer-3 │
│ ↓ ↓ ↓ │
│ Part-0 Part-1 Part-2 │
└────────────────────────────────────────────────────┘
Каждый consumer читает из ОДНОГО partition
Порядок сообщений ГАРАНТИРОВАН внутри partition
Правило распределения:
- Если consumers < partitions → некоторые consumers читают из нескольких partitions
- Если consumers = partitions → один consumer на один partition (оптимально)
- Если consumers > partitions → некоторые consumers будут неактивны
Пример: Consumer Group в Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "orders-processing"); // Группа
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // С начала если нет offset
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Consumer.subscribe(Arrays.asList("orders")); // Подписываемся на Topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(),
record.offset(),
record.key(),
record.value()
);
// Обработка сообщения
processOrder(record.value());
}
consumer.commitSync(); // Сохраняем offset
}
Offset — отслеживание позиции
Offset — индекс позиции сообщения в partition:
Partition 0:
┌─────────────┬─────────────┬─────────────┬─────────────┐
│ Offset 0 │ Offset 1 │ Offset 2 │ Offset 3 │
│ message1 │ message2 │ message3 │ message4 │
└─────────────┴─────────────┴─────────────┴─────────────┘
↑
Consumer read from offset 1
(прочитает сообщения 1, 2, 3, ...)
Commit offset — сохранение прогресса:
// Auto commit (опасно — может потеряться сообщение)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// Manual commit (безопасно)
props.put("enable.auto.commit", "false");
// ...
consumer.commitSync(); // Синхронный commit
consumer.commitAsync(); // Асинхронный commit
Гарантии доставки
// Producer acks configuration
props.put("acks", "all"); // Ждем пока все replicas запишут
// acks=0: не ждем ответа (быстро, риск потери)
// acks=1: ждем ответа от leader (среднее)
// acks=all: ждем всех replicas (медленно, надежно)
props.put("retries", 3); // Повторные попытки
props.put("max.in.flight.requests.per.connection", 1); // Порядок сообщений
Rebalancing — переераспределение при масштабировании
Начальное состояние:
Consumers: [C1, C2, C3]
Partitions: [P0, P1, P2]
Пораспределение: C1→P0, C2→P1, C3→P2
Добавляем Consumer C4:
↓ Rebalance начинается
Новое распределение: C1→P0, C2→P1, C3→P2, C4→ (никуда)
Удаляем Consumer C3:
↓ Rebalance начинается
Новое распределение: C1→P0,P2, C2→P1, C4→ (никуда)
Во время Rebalance:
- Обработка сообщений ОСТАНАВЛИВАЕТСЯ
- Consumer Group перебалансируется
- Это занимает время и влияет на throughput
Лучшие практики
- Количество partitions должно быть ≥ количеству consumers
- Используй key для обеспечения порядка связанных сообщений
- Обрабатывай idempotently (одно сообщение несколько раз) из-за rebalancing
- Коммитай offset после успешной обработки
- Настраивай batch.size и linger.ms для увеличения throughput
- Мониторь consumer lag — отставание от producer
// Проверка lag
consumer.assignment().forEach(partition -> {
long committed = consumer.committed(partition).offset();
long position = consumer.position(partition);
System.out.println("Lag: " + (position - committed));
});
Итог
Кафка масштабируется используя:
- Partitions для параллелизма внутри topic
- Consumer Groups для параллельной обработки
- Keys для гарантии порядка
- Offsets для отслеживания прогресса
Это позволяет обрабатывать миллионы сообщений в секунду с гарантиями доставки.