Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как устроен Kafka Topic
Topic в Apache Kafka — это фундаментальная абстракция для хранения и распределения данных. Это не простая очередь, а сложная распределённая структура, разработанная для масштабируемости и отказоустойчивости.
Основная архитектура
Topic состоит из одного или нескольких разделов (partitions). Каждый раздел — это упорядоченная последовательность сообщений, хранящихся на диске:
public class TopicStructure {
// Topic: "user-events"
// ├── Partition 0: [msg1, msg2, msg3, ...]
// ├── Partition 1: [msg4, msg5, msg6, ...]
// └── Partition 2: [msg7, msg8, msg9, ...]
}
1. Partitions — ключ масштабируемости
Каждый partition хранится на отдельном сервере (broker) и может обслуживаться независимо:
// Конфигурация topic
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
// Создание topic с 3 partitions и репликацией 2
NewTopic topic = new NewTopic(
"user-events",
3, // numPartitions
(short) 2 // replicationFactor
);
admin.createTopics(Collections.singleton(topic));
}
Преимущества partitions:
- Параллелизм: несколько producers/consumers работают одновременно
- Масштабируемость: topic может быть больше одного сервера
- Отказоустойчивость: репликация данных между brokers
2. Offset — позиция в partition
Каждое сообщение в partition имеет уникальный идентификатор — offset:
class Partition {
private long startOffset = 0;
private long endOffset = 0; // начальный offset
private List<Message> messages = new ArrayList<>();
// Partition 0:
// Offset 0: "user_registered, id=123"
// Offset 1: "user_logged_in, id=123"
// Offset 2: "user_purchased, id=123"
// Offset 3: "user_deleted_account, id=123"
}
Consumer читает сообщения последовательно по offset:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Topic: " + record.topic());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
System.out.println("Value: " + record.value());
}
3. Replication — отказоустойчивость
Каждый partition реплицируется на несколько brokers. Один из них — Leader, остальные — Followers:
// Topic: "orders" с 3 partitions, replicationFactor=3
//
// Partition 0:
// Leader: Broker 1
// Replicas: [Broker 1, Broker 2, Broker 3]
// ISR (In-Sync Replicas): [Broker 1, Broker 2, Broker 3]
//
// Partition 1:
// Leader: Broker 2
// Replicas: [Broker 2, Broker 3, Broker 1]
// ISR: [Broker 2, Broker 3, Broker 1]
//
// Partition 2:
// Leader: Broker 3
// Replicas: [Broker 3, Broker 1, Broker 2]
// ISR: [Broker 3, Broker 1]
ConfigMap replicationConfig = new ConfigMap();
replicationConfig.put("min.insync.replicas", "2");
// Гарантирует, что сообщение записано минимум на 2 replicas
Leader:
- Обслуживает все读 и записи для partition
- Синхронизирует данные с followers
Followers:
- Реплицируют данные с leader
- Могут стать leader при отказе
4. Log Segment — организация данных на диске
Каждый partition разделён на segments — файлы на диске:
// Partition хранится как:
// 00000000000000000000.log (сегмент 0-999)
// 00000000000001000000.log (сегмент 1000-1999)
// 00000000000002000000.log (сегмент 2000-2999)
// 00000000000000000000.index (индекс для быстрого поиска)
// 00000000000000000000.timeindex (индекс по времени)
class LogSegment {
private File logFile; // данные
private File indexFile; // offset -> position
private File timeIndexFile; // timestamp -> offset
private int baseOffset; // первый offset в сегменте
// Пример индекса:
// offset: position
// 0: 0
// 10: 4096
// 20: 8192
}
5. Key-based Routing — выбор partition
Когда producer отправляет сообщение, он выбирает partition на основе key:
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Сообщение с key="user_123"
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events", // topic
"user_123", // key (определяет partition)
"{\"action\":\"login\"}" // value
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Partition: " + metadata.partition());
System.out.println("Offset: " + metadata.offset());
}
});
// Все сообщения с одним key идут в один partition!
// Это гарантирует порядок обработки для одного пользователя
6. Consumer Group — распределённая обработка
Множество consumers могут читать из topic, распределяя работу:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-group");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
// Topic: "orders" (3 partitions)
// Consumer Group: "order-processing-group"
//
// Consumer 1 -> Partition 0
// Consumer 2 -> Partition 1
// Consumer 3 -> Partition 2
//
// Если одного consumer отключить:
// Consumer 1 -> Partitions 0, 1
// Consumer 3 -> Partition 2
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value());
}
}
7. Retention — как долго хранятся данные
Topic можно настроить на удержание сообщений:
Properties topicConfig = new Properties();
topicConfig.put("retention.ms", "7 * 24 * 60 * 60 * 1000"); // 7 дней
topicConfig.put("retention.bytes", "1073741824"); // 1 GB
topicConfig.put("cleanup.policy", "delete"); // или "compact"
NewTopic topic = new NewTopic("events", 3, (short) 2);
topic.configs(topicConfig);
cleanup.policy:
- delete: удалять старые сообщения (по времени или размеру)
- compact: хранить последнее сообщение для каждого key
8. Практический пример: полный цикл
// Topic с 3 partitions
// Producer отправляет 100 сообщений с ключами user_1..user_100
// Consumer группа читает и обрабатывает
public class KafkaTopicExample {
public static void main(String[] args) throws Exception {
// Producer
KafkaProducer<String, String> producer = createProducer();
for (int i = 1; i <= 100; i++) {
producer.send(new ProducerRecord<>(
"user-events",
"user_" + (i % 10), // ключ: user_0..user_9
"event_" + i
));
}
producer.close();
// Consumer
KafkaConsumer<String, String> consumer = createConsumer();
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(
Duration.ofMillis(100)
);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"[%d] key=%s value=%s%n",
record.partition(),
record.key(),
record.value()
);
}
}
}
}
Резюме структуры Kafka Topic
Topic = множество partitions (распределённо на brokers)
Partition = упорядоченная последовательность сообщений с offsets
Replication = каждый partition реплицируется (Leader + Followers)
Segments = partition разбит на файлы для эффективного хранения
Consumer Group = несколько consumers читают topic параллельно
Эта архитектура обеспечивает масштабируемость, надёжность и высокую пропускную способность.