← Назад к вопросам

Как топики связаны с подписчиками в Kafka

2.0 Middle🔥 251 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как топики связаны с подписчиками в Apache Kafka

Kafka — это распределённая система потоковой обработки данных, построенная на модели издатель-подписчик. Рассмотрим архитектуру этого взаимодействия.

1. Основная архитектура

Producers (Издатели)          Topics (Топики)          Consumers (Подписчики)
    ↓                             ↓                            ↓
  App A  ──┐                  Topic 1                       Consumer 1
  App B  ──┼──→ Kafka Broker ─→ Partition 0 ─┐            Consumer 2
  App C  ──┘                  Partition 1 ─→ Consumer 3
                               Partition 2
                               
                               Topic 2
                               Partition 0
                               Partition 1

2. Структура топика

Топик — это логический канал, разбитый на партиции:

// Создание топика
String topic = "user-events";
int partitions = 3;
short replicationFactor = 1;

AdminClient admin = AdminClient.create(properties);
NewTopic newTopic = new NewTopic(topic, partitions, replicationFactor);
admin.createTopics(Collections.singleton(newTopic));

Топик состоит из:

  • Партиции — независимые очереди сообщений
  • Репликация — копии партиций на разных брокерах для надежности
  • Лидер — основная партиция, принимающая данные
  • ISR (In-Sync Replicas) — синхронизированные копии

3. Партиции и масштабируемость

Партиции обеспечивают параллельную обработку:

Топик: "orders"

Partition 0: [msg1, msg2, msg3, ...]
Partition 1: [msg1, msg2, msg3, ...]
Partition 2: [msg1, msg2, msg3, ...]

↓ Масштабируемость

Consumer 1 читает из Partition 0
Consumer 2 читает из Partition 1
Consumer 3 читает из Partition 2

= Параллельная обработка 3x

4. Consumer Groups (Группы подписчиков)

Потребители организованы в группы для распределения нагрузки:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records) {
    System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value = %s%n",
      record.topic(), record.partition(), record.offset(), record.key(), record.value());
  }
}

5. Распределение партиций среди потребителей

Сценарий 1: Потребители < Партиций

Топик "orders" имеет 3 партиции
Группа имеет 2 потребителя

Consumer 1 → читает Partition 0, 1
Consumer 2 → читает Partition 2

―――――――――――――――――――――――――――――

Сценарий 2: Потребители = Партиций

Топик "orders" имеет 3 партиции
Группа имеет 3 потребителя

Consumer 1 → читает Partition 0
Consumer 2 → читает Partition 1
Consumer 3 → читает Partition 2

(Идеальный случай — каждый читает одну партицию)

―――――――――――――――――――――――――――――

Сценарий 3: Потребители > Партиций

Топик "orders" имеет 3 партиции
Группа имеет 5 потребителей

Consumer 1 → читает Partition 0
Consumer 2 → читает Partition 1
Consumer 3 → читает Partition 2
Consumer 4 → idle (неактивен)
Consumer 5 → idle (неактивен)

6. Сумма ключей и партиции

Значение ключа определяет, в какую партицию попадёт сообщение:

Producer<String, String> producer = new KafkaProducer<>(props);

// Сообщения с одним ключом идут в одну партицию (гарантирует порядок)
String key = "user-123";
String value = "Order placed";

ProducerRecord<String, String> record = 
  new ProducerRecord<>("orders", key, value);

producer.send(record, (metadata, exception) -> {
  if (exception == null) {
    System.out.printf("Message sent to partition %d with offset %d%n",
      metadata.partition(), metadata.offset());
  }
});

// Логика распределения:
// partition = hash(key) % num_partitions
// Это гарантирует, что все сообщения одного пользователя идут в одну партицию

7. Offset и отслеживание позиции

// Каждое сообщение имеет offset (позицию) в партиции
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
  System.out.printf("Partition: %d, Offset: %d, Value: %s%n",
    record.partition(), record.offset(), record.value());
  
  // Сохранение позиции (по умолчанию автоматизировано)
  consumer.commitSync();
}

// При перезагрузке потребитель продолжит с сохранённого offset

8. Гарантии доставки сообщений

// At-most-once (может потеряться)
// enable.auto.commit = true

// At-least-once (может дублироваться)
props.put("enable.auto.commit", false);
consumer.poll(Duration.ofMillis(100));
consumer.commitSync(); // Коммит после обработки

// Exactly-once (требует транзакции)
props.put("isolation.level", "read_committed");

9. Балансировка между потребителями (Rebalancing)

consumer.poll(); // Автоматически присоединяется к группе
// При добавлении/удалении потребителя происходит rebalancing

// Слушание событий rebalancing
consumer.subscribe(
  Collections.singletonList("orders"),
  new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      System.out.println("Partitions revoked: " + partitions);
      consumer.commitSync(); // Завершить обработку
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      System.out.println("Partitions assigned: " + partitions);
    }
  }
);

10. Производительность и масштабирование

Рекомендации:

  • Число партиций = Число потребителей (в идеале)
  • Фактор репликации = минимум 3 для production
  • Retention period = сколько хранить сообщения
  • Compression = включить (snappy, lz4, gzip)
  • Batch size = увеличивать для нагрузки
Properties props = new Properties();
props.put("batch.size", 16384); // 16KB
props.put("linger.ms", 10); // Ждать 10ms перед отправкой
props.put("compression.type", "snappy");

11. Полный пример: Producer + Consumer

// PRODUCER
public void produceMessages() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("key.serializer", StringSerializer.class);
  props.put("value.serializer", StringSerializer.class);
  
  KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  
  for (int i = 0; i < 100; i++) {
    ProducerRecord<String, String> record = 
      new ProducerRecord<>("orders", "user-" + i, "Order " + i);
    producer.send(record);
  }
  
  producer.close();
}

// CONSUMER
public void consumeMessages() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "order-group");
  props.put("key.deserializer", StringDeserializer.class);
  props.put("value.deserializer", StringDeserializer.class);
  
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Collections.singletonList("orders"));
  
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
      System.out.println("Received: " + record.value());
    }
  }
}

Итоговая модель связи

Производитель → Топик (с N партициями) → Группа потребителей
                     ↓
              Каждая партиция → один потребитель (или несколько читают одну)
              Все партиции → все потребители вместе

Кafka обеспечивает масштабируемость благодаря партиционированию и параллельной обработке через потребителей.