← Назад к вопросам

Как правильно распределить консьюмеров группы между 3 партициями одного топика

2.0 Middle🔥 161 комментариев
#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Распределение консьюмеров Kafka между партициями

В Kafka распределение консьюмеров группы между партициями топика — это одно из ключевых понятий для эффективной обработки сообщений.

1. Основные принципы

Правило: 1 консьюмер = несколько партиций (максимум)

Топик: user-events (3 партиции)
├─ Partition 0
├─ Partition 1  
└─ Partition 2

Группа: user-service-group (N консьюмеров)
├─ Consumer 0 → Partition 0
├─ Consumer 1 → Partition 1
├─ Consumer 2 → Partition 2
└─ Consumer N → (неиспользуется, нет партиций)

Ключевые моменты:

  1. Каждая партиция может быть назначена только ОДНОМУ консьюмеру одной группы
  2. Если консьюмеров больше, чем партиций — лишние будут неиспользованы
  3. Если консьюмеров меньше, чем партиций — один консьюмер обработает несколько
  4. Распределение происходит автоматически через Rebalancing

2. Оптимальное распределение

Сценарий 1: 3 консьюмера на 3 партиции (ИДЕАЛЬНО)

@SpringBootApplication
@EnableKafka
public class KafkaApplication {
    
    @Bean
    public ConsumerFactory<String, UserEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "user-service-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Остальная конфигурация
        
        return new DefaultKafkaConsumerFactory<>(config);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // concurrency = количество консьюмеров в одном процессе
        factory.setConcurrency(3);  // 3 консьюмера для 3 партиций
        return factory;
    }
}

@Component
public class UserEventListener {
    private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
    
    @KafkaListener(
        topics = "user-events",
        groupId = "user-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleUserEvent(UserEvent event, 
                               @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                               @Header(KafkaHeaders.OFFSET) long offset) {
        logger.info("Processing event from partition {} (offset {}): {}", partition, offset, event);
        // Обработка события
    }
}

// Результат: каждый из 3 консьюмеров обрабатывает одну партицию
// Consumer 0 → Partition 0
// Consumer 1 → Partition 1
// Consumer 2 → Partition 2

Сценарий 2: 2 консьюмера на 3 партиции (один консьюмер обрабатывает 2 партиции)

@Configuration
public class KafkaConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);  // Только 2 консьюмера
        return factory;
    }
}

// Результат распределения:
// Consumer 0 → Partition 0 + Partition 1
// Consumer 1 → Partition 2
// (или другой вариант в зависимости от Rebalancer)

Сценарий 3: 5 консьюмеров на 3 партиции (3 активны, 2 простаивают)

factory.setConcurrency(5);  // 5 консьюмеров для 3 партиций

// Результат:
// Consumer 0 → Partition 0
// Consumer 1 → Partition 1
// Consumer 2 → Partition 2
// Consumer 3 → (без партиции, простаивает)
// Consumer 4 → (без партиции, простаивает)

// ❌ Неэффективно: впустую расходуются ресурсы

3. Механизм Rebalancing

Когда консьюмеры переназначаются на партиции.

@Configuration
public class KafkaRebalanceConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        
        // Слушаем события rebalancing
        factory.getContainerProperties().setConsumerRebalanceListener(
            new ConsumerAwareRebalanceListener() {
                @Override
                public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, 
                                                           Collection<TopicPartition> partitions) {
                    logger.info("Rebalancing: revoking partitions: {}", partitions);
                    // Очищаем локальное состояние
                }
                
                @Override
                public void onPartitionsAssigned(Consumer<?, ?> consumer, 
                                               Collection<TopicPartition> partitions) {
                    logger.info("Rebalancing: assigned partitions: {}", partitions);
                    // Инициализируем локальное состояние
                }
            });
        
        return factory;
    }
}

4. Назначение партиций (Partition Assignment Strategy)

Range Strategy (по умолчанию)

Топик: user-events (3 партиции)
Группа: user-service-group (2 консьюмера)

RangeAssignor:
Consumer 0: [Partition 0, Partition 1]
Consumer 1: [Partition 2]

Round-Robin Strategy

Топик: user-events (3 партиции)
Группа: user-service-group (2 консьюмера)

RoundRobinAssignor:
Consumer 0: [Partition 0, Partition 2]
Consumer 1: [Partition 1]

Sticky Strategy (прилипчивая) — РЕКОМЕНДУЕТСЯ

@Bean
public ConsumerFactory<String, UserEvent> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "user-service-group");
    // Sticky assignor минимизирует переназначения при rebalancing
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
        "org.apache.kafka.clients.consumer.StickyAssignor");
    
    return new DefaultKafkaConsumerFactory<>(config);
}

5. Практический пример: правильная конфигурация

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    
    public static final String USER_EVENTS_TOPIC = "user-events";
    public static final String USER_EVENTS_GROUP = "user-service-group";
    
    @Bean
    public ConsumerFactory<String, UserEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, USER_EVENTS_GROUP);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Ручной commit
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);   // Heartbeat
        config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
            "org.apache.kafka.clients.consumer.StickyAssignor");
        
        return new DefaultKafkaConsumerFactory<>(config);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);  // 3 консьюмера для 3 партиций
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

@Component
public class UserEventConsumer {
    private static final Logger logger = LoggerFactory.getLogger(UserEventConsumer.class);
    private final UserService userService;
    
    public UserEventConsumer(UserService userService) {
        this.userService = userService;
    }
    
    @KafkaListener(
        topics = "user-events",
        groupId = "user-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleUserEvent(UserEvent event,
                               @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                               @Header(KafkaHeaders.OFFSET) long offset,
                               Acknowledgment acknowledgment) {
        try {
            logger.info("[Partition {}] Processing event: {}", partition, event);
            userService.processEvent(event);
            
            // Ручной commit при успехе
            acknowledgment.acknowledge();
        } catch (Exception e) {
            logger.error("[Partition {}] Error processing event", partition, e);
            // Сообщение останется в топике для повтора
        }
    }
}

6. Мониторинг распределения консьюмеров

@Component
public class ConsumerGroupMonitor {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupMonitor.class);
    private final Admin admin;
    
    @Scheduled(fixedRate = 60000)  // Каждую минуту
    public void monitorConsumerGroup() throws Exception {
        DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
            Collections.singletonList("user-service-group"));
        
        ConsumerGroupDescription group = result.all().get()
            .get("user-service-group");
        
        logger.info("Consumer Group: {}", group.groupId());
        logger.info("Members count: {}", group.members().size());
        logger.info("Authorized operations: {}", group.authorizedOperations());
        
        group.members().forEach(member -> {
            logger.info("  Member: {} - Partitions: {}",
                member.memberId(),
                member.assignment().topicPartitions());
        });
    }
}

7. Best Practices

// ✅ Количество консьюмеров <= количество партиций
factory.setConcurrency(Math.min(numConsumers, numPartitions));

// ✅ Используй StickyAssignor
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    "org.apache.kafka.clients.consumer.StickyAssignor");

// ✅ Ручной commit для критичной обработки
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

// ✅ Обрабатывай партицию и offset
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset

// ❌ Не создавай больше консьюмеров чем партиций
// factory.setConcurrency(10);  // для 3 партиций — впустую

// ❌ Не игнорируй rebalancing события

8. Таблица рекомендаций

ПартицийКонсьюмеровСтатусОптимально
33ИдеальноДа
32РаботаетУсловно (один обрабатывает 2)
34РаботаетНет (1 неиспользуется)
31РаботаетНет (один обрабатывает все)

Выводы

  1. Оптимально: консьюмеров = партиций (3=3)
  2. Допустимо: консьюмеров < партиций (каждый обрабатывает несколько)
  3. Неэффективно: консьюмеров > партиций (лишние простаивают)
  4. Используй StickyAssignor для минимизации переназначений
  5. Мониторь процесс rebalancing для отладки
  6. Ручной commit для критичной обработки
  7. Обрабатывай каждую партицию отдельно если нужна гарантия порядка