← Назад к вопросам

Что такое consumer group?

1.0 Junior🔥 151 комментариев
#Soft Skills и карьера

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое consumer group?

Consumer group — это механизм в Apache Kafka для распределённой обработки сообщений. Группа потребителей представляет собой набор потребителей (consumers), которые вместе обрабатывают сообщения из одного или нескольких топиков.

Ключевая идея: каждое сообщение в партиции топика обрабатывается ровно одним потребителем в группе. Это обеспечивает масштабируемость и отказоустойчивость.

Как работает consumer group

Основные характеристики:

  1. Уникальный идентификатор — каждая группа имеет уникальный ID (group.id)
  2. Партицирование — Kafka автоматически распределяет партиции между потребителями в группе
  3. Балансировка — если потребитель падает или добавляется новый, Kafka перебалансирует партиции
  4. Offset управление — группа отслеживает, какие сообщения уже обработаны (offset)

Архитектура

Когда несколько потребителей входят в одну группу:

Топик [Partition 0, Partition 1, Partition 2, Partition 3]
       ↓
Consumer Group "my-group"
  ├─ Consumer 1 → Partition 0, 1
  ├─ Consumer 2 → Partition 2, 3
  └─ Consumer 3 → (перебалансируется)

Каждая партиция может быть назначена только одному потребителю в группе.

Пример в Java с KafkaConsumer

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumer {
    public static void main(String[] args) {
        // Конфигурация потребителя
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); // ID группы
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        
        // Создание потребителя
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Подписка на топик
        consumer.subscribe(Collections.singletonList("my-topic"));
        
        // Обработка сообщений
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Partition: " + record.partition() + 
                        ", Offset: " + record.offset() + 
                        ", Value: " + record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Несколько групп потребителей

Одна из мощных возможностей Kafka — несколько независимых групп могут потреблять одни и те же сообщения:

// Группа 1: обработка аналитики
props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-group");
KafkaConsumer<String, String> analyticsConsumer = new KafkaConsumer<>(props);
analyticsConsumer.subscribe(Collections.singletonList("user-events"));

// Группа 2: отправка в хранилище
props.put(ConsumerConfig.GROUP_ID_CONFIG, "storage-group");
KafkaConsumer<String, String> storageConsumer = new KafkaConsumer<>(props);
storageConsumer.subscribe(Collections.singletonList("user-events"));

// Обе группы независимо обрабатывают одни и те же сообщения

Управление offset

Offset — это позиция потребителя в партиции. Consumer group отслеживает, какие сообщения уже обработаны:

// Автоматическое управление offset (не рекомендуется в production)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

// Ручное управление offset (рекомендуется)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        
        for (ConsumerRecord<String, String> record : records) {
            // Обработка сообщения
            processRecord(record);
        }
        
        // Коммит после успешной обработки
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

Rebalancing

Когда потребитель присоединяется или покидает группу, происходит rebalancing — перераспределение партиций:

До rebalancing:
Partition 0, 1, 2, 3 → Consumer 1

После добавления Consumer 2:
Partition 0, 1 → Consumer 1
Partition 2, 3 → Consumer 2

Время rebalancing приводит к паузе обработки, поэтому важно это учитывать.

Преимущества consumer groups

  • Масштабируемость — добавь потребителей, и обработка станет быстрее
  • Отказоустойчивость — если один потребитель падает, другие продолжают работу
  • Параллелизм — сообщения из разных партиций обрабатываются параллельно
  • Гибкость — независимые группы могут обрабатывать одни данные по-разному

Практические советы

  1. Выбор количества потребителей — не больше, чем партиций в топике
  2. Мониторинг lag — отставание потребителя от последнего сообщения
  3. Graceful shutdown — корректно заверши потребителя перед перезапуском
  4. Dead Letter Queue — обрабатывай ошибки без потери сообщений