← Назад к вопросам

Как устроен Kafka Topic?

2.3 Middle🔥 171 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как устроен Kafka Topic

Topic в Apache Kafka — это фундаментальная абстракция для хранения и распределения данных. Это не простая очередь, а сложная распределённая структура, разработанная для масштабируемости и отказоустойчивости.

Основная архитектура

Topic состоит из одного или нескольких разделов (partitions). Каждый раздел — это упорядоченная последовательность сообщений, хранящихся на диске:

public class TopicStructure {
    // Topic: "user-events"
    // ├── Partition 0: [msg1, msg2, msg3, ...]
    // ├── Partition 1: [msg4, msg5, msg6, ...]
    // └── Partition 2: [msg7, msg8, msg9, ...]
}

1. Partitions — ключ масштабируемости

Каждый partition хранится на отдельном сервере (broker) и может обслуживаться независимо:

// Конфигурация topic
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient admin = AdminClient.create(props)) {
    // Создание topic с 3 partitions и репликацией 2
    NewTopic topic = new NewTopic(
        "user-events",
        3,           // numPartitions
        (short) 2    // replicationFactor
    );
    
    admin.createTopics(Collections.singleton(topic));
}

Преимущества partitions:

  • Параллелизм: несколько producers/consumers работают одновременно
  • Масштабируемость: topic может быть больше одного сервера
  • Отказоустойчивость: репликация данных между brokers

2. Offset — позиция в partition

Каждое сообщение в partition имеет уникальный идентификатор — offset:

class Partition {
    private long startOffset = 0;
    private long endOffset = 0;      // начальный offset
    private List<Message> messages = new ArrayList<>();
    
    // Partition 0:
    // Offset 0: "user_registered, id=123"
    // Offset 1: "user_logged_in, id=123"
    // Offset 2: "user_purchased, id=123"
    // Offset 3: "user_deleted_account, id=123"
}

Consumer читает сообщения последовательно по offset:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    System.out.println("Topic: " + record.topic());
    System.out.println("Partition: " + record.partition());
    System.out.println("Offset: " + record.offset());
    System.out.println("Value: " + record.value());
}

3. Replication — отказоустойчивость

Каждый partition реплицируется на несколько brokers. Один из них — Leader, остальные — Followers:

// Topic: "orders" с 3 partitions, replicationFactor=3
//
// Partition 0:
//   Leader: Broker 1
//   Replicas: [Broker 1, Broker 2, Broker 3]
//   ISR (In-Sync Replicas): [Broker 1, Broker 2, Broker 3]
//
// Partition 1:
//   Leader: Broker 2
//   Replicas: [Broker 2, Broker 3, Broker 1]
//   ISR: [Broker 2, Broker 3, Broker 1]
//
// Partition 2:
//   Leader: Broker 3
//   Replicas: [Broker 3, Broker 1, Broker 2]
//   ISR: [Broker 3, Broker 1]

ConfigMap replicationConfig = new ConfigMap();
replicationConfig.put("min.insync.replicas", "2");
// Гарантирует, что сообщение записано минимум на 2 replicas

Leader:

  • Обслуживает все读 и записи для partition
  • Синхронизирует данные с followers

Followers:

  • Реплицируют данные с leader
  • Могут стать leader при отказе

4. Log Segment — организация данных на диске

Каждый partition разделён на segments — файлы на диске:

// Partition хранится как:
// 00000000000000000000.log       (сегмент 0-999)
// 00000000000001000000.log       (сегмент 1000-1999)
// 00000000000002000000.log       (сегмент 2000-2999)
// 00000000000000000000.index     (индекс для быстрого поиска)
// 00000000000000000000.timeindex (индекс по времени)

class LogSegment {
    private File logFile;        // данные
    private File indexFile;      // offset -> position
    private File timeIndexFile;  // timestamp -> offset
    private int baseOffset;      // первый offset в сегменте
    
    // Пример индекса:
    // offset: position
    // 0: 0
    // 10: 4096
    // 20: 8192
}

5. Key-based Routing — выбор partition

Когда producer отправляет сообщение, он выбирает partition на основе key:

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Сообщение с key="user_123"
ProducerRecord<String, String> record = new ProducerRecord<>(
    "user-events",           // topic
    "user_123",              // key (определяет partition)
    "{\"action\":\"login\"}"  // value
);

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        System.out.println("Partition: " + metadata.partition());
        System.out.println("Offset: " + metadata.offset());
    }
});

// Все сообщения с одним key идут в один partition!
// Это гарантирует порядок обработки для одного пользователя

6. Consumer Group — распределённая обработка

Множество consumers могут читать из topic, распределяя работу:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-group");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

// Topic: "orders" (3 partitions)
// Consumer Group: "order-processing-group"
//
// Consumer 1 -> Partition 0
// Consumer 2 -> Partition 1
// Consumer 3 -> Partition 2
//
// Если одного consumer отключить:
// Consumer 1 -> Partitions 0, 1
// Consumer 3 -> Partition 2

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processOrder(record.value());
    }
}

7. Retention — как долго хранятся данные

Topic можно настроить на удержание сообщений:

Properties topicConfig = new Properties();
topicConfig.put("retention.ms", "7 * 24 * 60 * 60 * 1000"); // 7 дней
topicConfig.put("retention.bytes", "1073741824");            // 1 GB
topicConfig.put("cleanup.policy", "delete");                // или "compact"

NewTopic topic = new NewTopic("events", 3, (short) 2);
topic.configs(topicConfig);

cleanup.policy:

  • delete: удалять старые сообщения (по времени или размеру)
  • compact: хранить последнее сообщение для каждого key

8. Практический пример: полный цикл

// Topic с 3 partitions
// Producer отправляет 100 сообщений с ключами user_1..user_100
// Consumer группа читает и обрабатывает

public class KafkaTopicExample {
    public static void main(String[] args) throws Exception {
        // Producer
        KafkaProducer<String, String> producer = createProducer();
        for (int i = 1; i <= 100; i++) {
            producer.send(new ProducerRecord<>(
                "user-events",
                "user_" + (i % 10),  // ключ: user_0..user_9
                "event_" + i
            ));
        }
        producer.close();
        
        // Consumer
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList("user-events"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(
                Duration.ofMillis(100)
            );
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf(
                    "[%d] key=%s value=%s%n",
                    record.partition(),
                    record.key(),
                    record.value()
                );
            }
        }
    }
}

Резюме структуры Kafka Topic

Topic = множество partitions (распределённо на brokers)

Partition = упорядоченная последовательность сообщений с offsets

Replication = каждый partition реплицируется (Leader + Followers)

Segments = partition разбит на файлы для эффективного хранения

Consumer Group = несколько consumers читают topic параллельно

Эта архитектура обеспечивает масштабируемость, надёжность и высокую пропускную способность.