← Назад к вопросам
Как правильно распределить консьюмеров группы между 3 партициями одного топика
2.0 Middle🔥 161 комментариев
#REST API и микросервисы#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Распределение консьюмеров Kafka между партициями
В Kafka распределение консьюмеров группы между партициями топика — это одно из ключевых понятий для эффективной обработки сообщений.
1. Основные принципы
Правило: 1 консьюмер = несколько партиций (максимум)
Топик: user-events (3 партиции)
├─ Partition 0
├─ Partition 1
└─ Partition 2
Группа: user-service-group (N консьюмеров)
├─ Consumer 0 → Partition 0
├─ Consumer 1 → Partition 1
├─ Consumer 2 → Partition 2
└─ Consumer N → (неиспользуется, нет партиций)
Ключевые моменты:
- Каждая партиция может быть назначена только ОДНОМУ консьюмеру одной группы
- Если консьюмеров больше, чем партиций — лишние будут неиспользованы
- Если консьюмеров меньше, чем партиций — один консьюмер обработает несколько
- Распределение происходит автоматически через Rebalancing
2. Оптимальное распределение
Сценарий 1: 3 консьюмера на 3 партиции (ИДЕАЛЬНО)
@SpringBootApplication
@EnableKafka
public class KafkaApplication {
@Bean
public ConsumerFactory<String, UserEvent> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "user-service-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Остальная конфигурация
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// concurrency = количество консьюмеров в одном процессе
factory.setConcurrency(3); // 3 консьюмера для 3 партиций
return factory;
}
}
@Component
public class UserEventListener {
private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
@KafkaListener(
topics = "user-events",
groupId = "user-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleUserEvent(UserEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
logger.info("Processing event from partition {} (offset {}): {}", partition, offset, event);
// Обработка события
}
}
// Результат: каждый из 3 консьюмеров обрабатывает одну партицию
// Consumer 0 → Partition 0
// Consumer 1 → Partition 1
// Consumer 2 → Partition 2
Сценарий 2: 2 консьюмера на 3 партиции (один консьюмер обрабатывает 2 партиции)
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2); // Только 2 консьюмера
return factory;
}
}
// Результат распределения:
// Consumer 0 → Partition 0 + Partition 1
// Consumer 1 → Partition 2
// (или другой вариант в зависимости от Rebalancer)
Сценарий 3: 5 консьюмеров на 3 партиции (3 активны, 2 простаивают)
factory.setConcurrency(5); // 5 консьюмеров для 3 партиций
// Результат:
// Consumer 0 → Partition 0
// Consumer 1 → Partition 1
// Consumer 2 → Partition 2
// Consumer 3 → (без партиции, простаивает)
// Consumer 4 → (без партиции, простаивает)
// ❌ Неэффективно: впустую расходуются ресурсы
3. Механизм Rebalancing
Когда консьюмеры переназначаются на партиции.
@Configuration
public class KafkaRebalanceConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
// Слушаем события rebalancing
factory.getContainerProperties().setConsumerRebalanceListener(
new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
logger.info("Rebalancing: revoking partitions: {}", partitions);
// Очищаем локальное состояние
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
logger.info("Rebalancing: assigned partitions: {}", partitions);
// Инициализируем локальное состояние
}
});
return factory;
}
}
4. Назначение партиций (Partition Assignment Strategy)
Range Strategy (по умолчанию)
Топик: user-events (3 партиции)
Группа: user-service-group (2 консьюмера)
RangeAssignor:
Consumer 0: [Partition 0, Partition 1]
Consumer 1: [Partition 2]
Round-Robin Strategy
Топик: user-events (3 партиции)
Группа: user-service-group (2 консьюмера)
RoundRobinAssignor:
Consumer 0: [Partition 0, Partition 2]
Consumer 1: [Partition 1]
Sticky Strategy (прилипчивая) — РЕКОМЕНДУЕТСЯ
@Bean
public ConsumerFactory<String, UserEvent> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "user-service-group");
// Sticky assignor минимизирует переназначения при rebalancing
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
return new DefaultKafkaConsumerFactory<>(config);
}
5. Практический пример: правильная конфигурация
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
public static final String USER_EVENTS_TOPIC = "user-events";
public static final String USER_EVENTS_GROUP = "user-service-group";
@Bean
public ConsumerFactory<String, UserEvent> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, USER_EVENTS_GROUP);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Ручной commit
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // Heartbeat
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 3 консьюмера для 3 партиций
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
@Component
public class UserEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(UserEventConsumer.class);
private final UserService userService;
public UserEventConsumer(UserService userService) {
this.userService = userService;
}
@KafkaListener(
topics = "user-events",
groupId = "user-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleUserEvent(UserEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
try {
logger.info("[Partition {}] Processing event: {}", partition, event);
userService.processEvent(event);
// Ручной commit при успехе
acknowledgment.acknowledge();
} catch (Exception e) {
logger.error("[Partition {}] Error processing event", partition, e);
// Сообщение останется в топике для повтора
}
}
}
6. Мониторинг распределения консьюмеров
@Component
public class ConsumerGroupMonitor {
private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupMonitor.class);
private final Admin admin;
@Scheduled(fixedRate = 60000) // Каждую минуту
public void monitorConsumerGroup() throws Exception {
DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
Collections.singletonList("user-service-group"));
ConsumerGroupDescription group = result.all().get()
.get("user-service-group");
logger.info("Consumer Group: {}", group.groupId());
logger.info("Members count: {}", group.members().size());
logger.info("Authorized operations: {}", group.authorizedOperations());
group.members().forEach(member -> {
logger.info(" Member: {} - Partitions: {}",
member.memberId(),
member.assignment().topicPartitions());
});
}
}
7. Best Practices
// ✅ Количество консьюмеров <= количество партиций
factory.setConcurrency(Math.min(numConsumers, numPartitions));
// ✅ Используй StickyAssignor
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
// ✅ Ручной commit для критичной обработки
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// ✅ Обрабатывай партицию и offset
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset
// ❌ Не создавай больше консьюмеров чем партиций
// factory.setConcurrency(10); // для 3 партиций — впустую
// ❌ Не игнорируй rebalancing события
8. Таблица рекомендаций
| Партиций | Консьюмеров | Статус | Оптимально |
|---|---|---|---|
| 3 | 3 | Идеально | Да |
| 3 | 2 | Работает | Условно (один обрабатывает 2) |
| 3 | 4 | Работает | Нет (1 неиспользуется) |
| 3 | 1 | Работает | Нет (один обрабатывает все) |
Выводы
- Оптимально: консьюмеров = партиций (3=3)
- Допустимо: консьюмеров < партиций (каждый обрабатывает несколько)
- Неэффективно: консьюмеров > партиций (лишние простаивают)
- Используй StickyAssignor для минимизации переназначений
- Мониторь процесс rebalancing для отладки
- Ручной commit для критичной обработки
- Обрабатывай каждую партицию отдельно если нужна гарантия порядка