Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Consumer Groups в Apache Kafka
Consumer Group (группа потребителей) — это механизм в Apache Kafka для распределения сообщений из топика между несколькими потребителями (consumers). Все потребители в группе получают копию каждого сообщения, но каждое сообщение обрабатывается только одним потребителем из группы.
Основной принцип
Каждому потребителю в группе назначена одна или несколько партиций топика. Если в топике 4 партиции и в группе 4 потребителя — каждый получит по одной партиции. Если потребителей 2 — каждый получит по 2 партиции.
Структура
Topic: orders (4 партиции)
├─ Partition 0 → Consumer 1
├─ Partition 1 → Consumer 1
├─ Partition 2 → Consumer 2
└─ Partition 3 → Consumer 2
Group ID: "order-processing"
Создание Consumer Group
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
public class ConsumerGroupExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Все экземпляры с одинаковым GROUP_ID образуют одну группу
consumer.subscribe(Arrays.asList("orders"));
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("Partition: " + record.partition() +
", Offset: " + record.offset() +
", Value: " + record.value());
});
}
}
}
Балансировка партиций (Rebalancing)
Когда меняется количество потребителей в группе, Kafka пересчитывает распределение партиций.
public class RebalancingExample {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Слушатель для отслеживания ребалансировки
consumer.subscribe(
Arrays.asList("orders"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Вызывается ПЕРЕД ребалансировкой
System.out.println("Потребитель потерял партиции: " + partitions);
// Можно сохранить смещение (offset)
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Вызывается ПОСЛЕ ребалансировки
System.out.println("Потребитель получил партиции: " + partitions);
}
}
);
}
}
Сценарий ребалансировки
Сценарий: Топик с 4 партициями, 2 потребителя
Начальное состояние:
├─ Consumer-1: Partitions [0, 1]
└─ Consumer-2: Partitions [2, 3]
Добавляем 3-го потребителя → Ребалансировка
├─ Consumer-1: Partitions [0]
├─ Consumer-2: Partitions [1, 2]
└─ Consumer-3: Partitions [3]
Время ребалансировки: 1-2 минуты (зависит от конфига)
Несколько Consumer Groups
Несколько групп могут обрабатывать один и тот же топик независимо:
// Группа 1: обработка заказов для биллинга
Properties props1 = new Properties();
props1.put(ConsumerConfig.GROUP_ID_CONFIG, "billing-group");
KafkaConsumer<String, String> billingConsumer = new KafkaConsumer<>(props1);
// Группа 2: обработка заказов для аналитики
Properties props2 = new Properties();
props2.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-group");
KafkaConsumer<String, String> analyticsConsumer = new KafkaConsumer<>(props2);
// Обе группы получат ВСЕ сообщения из топика "orders"
// Каждая группа отслеживает свой offset независимо
Consumer Lag
Consumer Lag — это разница между последним смещением (offset) в партиции и текущим смещением потребителя.
public class ConsumerLagMonitor {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
consumer.poll(Duration.ofMillis(100)); // Инициализировать
for (TopicPartition partition : consumer.assignment()) {
long currentOffset = consumer.position(partition);
long logEndOffset = consumer.endOffsets(
Arrays.asList(partition)
).get(partition);
long lag = logEndOffset - currentOffset;
System.out.println("Partition " + partition.partition() +
": lag = " + lag);
}
}
}
Commit Strategies
// 1. Auto-commit (автоматический коммит)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
// 2. Manual commit (ручной коммит)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
processRecord(record);
});
consumer.commitSync(); // Или commitAsync()
}
Стратегии распределения партиций
// RangeAssignor (по умолчанию)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RangeAssignor");
// RoundRobinAssignor
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// StickyAssignor (минимизирует смену партиций при ребалансировке)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
Лучшие практики
- Одна группа на сценарий: разные группы для разных целей обработки
- Мониторить lag: отставание потребителя показывает здоровье системы
- Graceful shutdown: правильно завершать потребителей для ребалансировки
- Идемпотентная обработка: потребитель может получить один раз и переобработать
- Паузы при проблемах: использовать pause/resume для throttling