Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое consumer group?
Consumer group — это механизм в Apache Kafka для распределённой обработки сообщений. Группа потребителей представляет собой набор потребителей (consumers), которые вместе обрабатывают сообщения из одного или нескольких топиков.
Ключевая идея: каждое сообщение в партиции топика обрабатывается ровно одним потребителем в группе. Это обеспечивает масштабируемость и отказоустойчивость.
Как работает consumer group
Основные характеристики:
- Уникальный идентификатор — каждая группа имеет уникальный ID (group.id)
- Партицирование — Kafka автоматически распределяет партиции между потребителями в группе
- Балансировка — если потребитель падает или добавляется новый, Kafka перебалансирует партиции
- Offset управление — группа отслеживает, какие сообщения уже обработаны (offset)
Архитектура
Когда несколько потребителей входят в одну группу:
Топик [Partition 0, Partition 1, Partition 2, Partition 3]
↓
Consumer Group "my-group"
├─ Consumer 1 → Partition 0, 1
├─ Consumer 2 → Partition 2, 3
└─ Consumer 3 → (перебалансируется)
Каждая партиция может быть назначена только одному потребителю в группе.
Пример в Java с KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyKafkaConsumer {
public static void main(String[] args) {
// Конфигурация потребителя
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); // ID группы
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// Создание потребителя
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Подписка на топик
consumer.subscribe(Collections.singletonList("my-topic"));
// Обработка сообщений
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Partition: " + record.partition() +
", Offset: " + record.offset() +
", Value: " + record.value());
}
}
} finally {
consumer.close();
}
}
}
Несколько групп потребителей
Одна из мощных возможностей Kafka — несколько независимых групп могут потреблять одни и те же сообщения:
// Группа 1: обработка аналитики
props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-group");
KafkaConsumer<String, String> analyticsConsumer = new KafkaConsumer<>(props);
analyticsConsumer.subscribe(Collections.singletonList("user-events"));
// Группа 2: отправка в хранилище
props.put(ConsumerConfig.GROUP_ID_CONFIG, "storage-group");
KafkaConsumer<String, String> storageConsumer = new KafkaConsumer<>(props);
storageConsumer.subscribe(Collections.singletonList("user-events"));
// Обе группы независимо обрабатывают одни и те же сообщения
Управление offset
Offset — это позиция потребителя в партиции. Consumer group отслеживает, какие сообщения уже обработаны:
// Автоматическое управление offset (не рекомендуется в production)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// Ручное управление offset (рекомендуется)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Обработка сообщения
processRecord(record);
}
// Коммит после успешной обработки
consumer.commitSync();
}
} finally {
consumer.close();
}
Rebalancing
Когда потребитель присоединяется или покидает группу, происходит rebalancing — перераспределение партиций:
До rebalancing:
Partition 0, 1, 2, 3 → Consumer 1
После добавления Consumer 2:
Partition 0, 1 → Consumer 1
Partition 2, 3 → Consumer 2
Время rebalancing приводит к паузе обработки, поэтому важно это учитывать.
Преимущества consumer groups
- Масштабируемость — добавь потребителей, и обработка станет быстрее
- Отказоустойчивость — если один потребитель падает, другие продолжают работу
- Параллелизм — сообщения из разных партиций обрабатываются параллельно
- Гибкость — независимые группы могут обрабатывать одни данные по-разному
Практические советы
- Выбор количества потребителей — не больше, чем партиций в топике
- Мониторинг lag — отставание потребителя от последнего сообщения
- Graceful shutdown — корректно заверши потребителя перед перезапуском
- Dead Letter Queue — обрабатывай ошибки без потери сообщений