← Назад к вопросам

Что такое группы в Kafka?

1.7 Middle🔥 151 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Consumer Groups в Apache Kafka

Consumer Group (группа потребителей) — это механизм в Apache Kafka для распределения сообщений из топика между несколькими потребителями (consumers). Все потребители в группе получают копию каждого сообщения, но каждое сообщение обрабатывается только одним потребителем из группы.

Основной принцип

Каждому потребителю в группе назначена одна или несколько партиций топика. Если в топике 4 партиции и в группе 4 потребителя — каждый получит по одной партиции. Если потребителей 2 — каждый получит по 2 партиции.

Структура

Topic: orders (4 партиции)
├─ Partition 0 → Consumer 1
├─ Partition 1 → Consumer 1
├─ Partition 2 → Consumer 2
└─ Partition 3 → Consumer 2

Group ID: "order-processing"

Создание Consumer Group

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;

public class ConsumerGroupExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Все экземпляры с одинаковым GROUP_ID образуют одну группу
        consumer.subscribe(Arrays.asList("orders"));
        
        while (true) {
            var records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.println("Partition: " + record.partition() + 
                                 ", Offset: " + record.offset() + 
                                 ", Value: " + record.value());
            });
        }
    }
}

Балансировка партиций (Rebalancing)

Когда меняется количество потребителей в группе, Kafka пересчитывает распределение партиций.

public class RebalancingExample {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Слушатель для отслеживания ребалансировки
        consumer.subscribe(
            Arrays.asList("orders"),
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // Вызывается ПЕРЕД ребалансировкой
                    System.out.println("Потребитель потерял партиции: " + partitions);
                    // Можно сохранить смещение (offset)
                }
                
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // Вызывается ПОСЛЕ ребалансировки
                    System.out.println("Потребитель получил партиции: " + partitions);
                }
            }
        );
    }
}

Сценарий ребалансировки

Сценарий: Топик с 4 партициями, 2 потребителя

Начальное состояние:
├─ Consumer-1: Partitions [0, 1]
└─ Consumer-2: Partitions [2, 3]

Добавляем 3-го потребителя → Ребалансировка
├─ Consumer-1: Partitions [0]
├─ Consumer-2: Partitions [1, 2]
└─ Consumer-3: Partitions [3]

Время ребалансировки: 1-2 минуты (зависит от конфига)

Несколько Consumer Groups

Несколько групп могут обрабатывать один и тот же топик независимо:

// Группа 1: обработка заказов для биллинга
Properties props1 = new Properties();
props1.put(ConsumerConfig.GROUP_ID_CONFIG, "billing-group");
KafkaConsumer<String, String> billingConsumer = new KafkaConsumer<>(props1);

// Группа 2: обработка заказов для аналитики
Properties props2 = new Properties();
props2.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-group");
KafkaConsumer<String, String> analyticsConsumer = new KafkaConsumer<>(props2);

// Обе группы получат ВСЕ сообщения из топика "orders"
// Каждая группа отслеживает свой offset независимо

Consumer Lag

Consumer Lag — это разница между последним смещением (offset) в партиции и текущим смещением потребителя.

public class ConsumerLagMonitor {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("orders"));
        
        consumer.poll(Duration.ofMillis(100)); // Инициализировать
        
        for (TopicPartition partition : consumer.assignment()) {
            long currentOffset = consumer.position(partition);
            long logEndOffset = consumer.endOffsets(
                Arrays.asList(partition)
            ).get(partition);
            
            long lag = logEndOffset - currentOffset;
            System.out.println("Partition " + partition.partition() + 
                             ": lag = " + lag);
        }
    }
}

Commit Strategies

// 1. Auto-commit (автоматический коммит)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

// 2. Manual commit (ручной коммит)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

while (true) {
    var records = consumer.poll(Duration.ofMillis(100));
    records.forEach(record -> {
        processRecord(record);
    });
    consumer.commitSync(); // Или commitAsync()
}

Стратегии распределения партиций

// RangeAssignor (по умолчанию)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    "org.apache.kafka.clients.consumer.RangeAssignor");

// RoundRobinAssignor
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");

// StickyAssignor (минимизирует смену партиций при ребалансировке)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    "org.apache.kafka.clients.consumer.StickyAssignor");

Лучшие практики

  • Одна группа на сценарий: разные группы для разных целей обработки
  • Мониторить lag: отставание потребителя показывает здоровье системы
  • Graceful shutdown: правильно завершать потребителей для ребалансировки
  • Идемпотентная обработка: потребитель может получить один раз и переобработать
  • Паузы при проблемах: использовать pause/resume для throttling
Что такое группы в Kafka? | PrepBro