← Назад к вопросам
Как топики связаны с подписчиками в Kafka
2.0 Middle🔥 251 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как топики связаны с подписчиками в Apache Kafka
Kafka — это распределённая система потоковой обработки данных, построенная на модели издатель-подписчик. Рассмотрим архитектуру этого взаимодействия.
1. Основная архитектура
Producers (Издатели) Topics (Топики) Consumers (Подписчики)
↓ ↓ ↓
App A ──┐ Topic 1 Consumer 1
App B ──┼──→ Kafka Broker ─→ Partition 0 ─┐ Consumer 2
App C ──┘ Partition 1 ─→ Consumer 3
Partition 2
Topic 2
Partition 0
Partition 1
2. Структура топика
Топик — это логический канал, разбитый на партиции:
// Создание топика
String topic = "user-events";
int partitions = 3;
short replicationFactor = 1;
AdminClient admin = AdminClient.create(properties);
NewTopic newTopic = new NewTopic(topic, partitions, replicationFactor);
admin.createTopics(Collections.singleton(newTopic));
Топик состоит из:
- Партиции — независимые очереди сообщений
- Репликация — копии партиций на разных брокерах для надежности
- Лидер — основная партиция, принимающая данные
- ISR (In-Sync Replicas) — синхронизированные копии
3. Партиции и масштабируемость
Партиции обеспечивают параллельную обработку:
Топик: "orders"
Partition 0: [msg1, msg2, msg3, ...]
Partition 1: [msg1, msg2, msg3, ...]
Partition 2: [msg1, msg2, msg3, ...]
↓ Масштабируемость
Consumer 1 читает из Partition 0
Consumer 2 читает из Partition 1
Consumer 3 читает из Partition 2
= Параллельная обработка 3x
4. Consumer Groups (Группы подписчиков)
Потребители организованы в группы для распределения нагрузки:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
5. Распределение партиций среди потребителей
Сценарий 1: Потребители < Партиций
Топик "orders" имеет 3 партиции
Группа имеет 2 потребителя
Consumer 1 → читает Partition 0, 1
Consumer 2 → читает Partition 2
―――――――――――――――――――――――――――――
Сценарий 2: Потребители = Партиций
Топик "orders" имеет 3 партиции
Группа имеет 3 потребителя
Consumer 1 → читает Partition 0
Consumer 2 → читает Partition 1
Consumer 3 → читает Partition 2
(Идеальный случай — каждый читает одну партицию)
―――――――――――――――――――――――――――――
Сценарий 3: Потребители > Партиций
Топик "orders" имеет 3 партиции
Группа имеет 5 потребителей
Consumer 1 → читает Partition 0
Consumer 2 → читает Partition 1
Consumer 3 → читает Partition 2
Consumer 4 → idle (неактивен)
Consumer 5 → idle (неактивен)
6. Сумма ключей и партиции
Значение ключа определяет, в какую партицию попадёт сообщение:
Producer<String, String> producer = new KafkaProducer<>(props);
// Сообщения с одним ключом идут в одну партицию (гарантирует порядок)
String key = "user-123";
String value = "Order placed";
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent to partition %d with offset %d%n",
metadata.partition(), metadata.offset());
}
});
// Логика распределения:
// partition = hash(key) % num_partitions
// Это гарантирует, что все сообщения одного пользователя идут в одну партицию
7. Offset и отслеживание позиции
// Каждое сообщение имеет offset (позицию) в партиции
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Value: %s%n",
record.partition(), record.offset(), record.value());
// Сохранение позиции (по умолчанию автоматизировано)
consumer.commitSync();
}
// При перезагрузке потребитель продолжит с сохранённого offset
8. Гарантии доставки сообщений
// At-most-once (может потеряться)
// enable.auto.commit = true
// At-least-once (может дублироваться)
props.put("enable.auto.commit", false);
consumer.poll(Duration.ofMillis(100));
consumer.commitSync(); // Коммит после обработки
// Exactly-once (требует транзакции)
props.put("isolation.level", "read_committed");
9. Балансировка между потребителями (Rebalancing)
consumer.poll(); // Автоматически присоединяется к группе
// При добавлении/удалении потребителя происходит rebalancing
// Слушание событий rebalancing
consumer.subscribe(
Collections.singletonList("orders"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
consumer.commitSync(); // Завершить обработку
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
}
);
10. Производительность и масштабирование
Рекомендации:
- Число партиций = Число потребителей (в идеале)
- Фактор репликации = минимум 3 для production
- Retention period = сколько хранить сообщения
- Compression = включить (snappy, lz4, gzip)
- Batch size = увеличивать для нагрузки
Properties props = new Properties();
props.put("batch.size", 16384); // 16KB
props.put("linger.ms", 10); // Ждать 10ms перед отправкой
props.put("compression.type", "snappy");
11. Полный пример: Producer + Consumer
// PRODUCER
public void produceMessages() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "user-" + i, "Order " + i);
producer.send(record);
}
producer.close();
}
// CONSUMER
public void consumeMessages() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-group");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
}
}
Итоговая модель связи
Производитель → Топик (с N партициями) → Группа потребителей
↓
Каждая партиция → один потребитель (или несколько читают одну)
Все партиции → все потребители вместе
Кafka обеспечивает масштабируемость благодаря партиционированию и параллельной обработке через потребителей.