← Назад к вопросам
Как увеличить производительность обработки Topic в Kafka работой с очередью двух Consumer
2.8 Senior🔥 231 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Масштабирование обработки Kafka Topic с несколькими Consumer
Использование нескольких потребителей в одной Consumer Group — стандартный способ увеличить throughput обработки сообщений.
1. Базовая архитектура
Topic: orders (3 партиции)
├── Partition 0
├── Partition 1
└── Partition 2
Consumer Group: order-processors
├── Consumer 1 (обрабатывает Partition 0)
├── Consumer 2 (обрабатывает Partition 1)
└── Consumer 3 (обрабатывает Partition 2)
Каждый consumer получает разные партиции, обработка идёт параллельно.
2. Конфигурация Consumer (Spring Kafka)
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Оптимизация производительности
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // 100KB
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Больше сообщений за раз
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Важно: количество потоков = числу партиций
factory.setConcurrency(3); // 3 потока для 3 партиций
// Manual commit для большего контроля
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
3. Consumer сервис (на одной машине, 3 потока)
@Service
public class OrderProcessingService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private Acknowledgment acknowledgment;
// Один listener обрабатывает messages из разных партиций
// благодаря concurrency=3
@KafkaListener(
topics = "orders",
groupId = "order-processors",
concurrency = "3" // 3 потока
)
public void processOrder(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment ack) {
System.out.println("Processing order from partition: " + partition);
try {
// Обработка заказа
Order order = new Order();
order.setId(event.getOrderId());
order.setStatus("PROCESSING");
orderRepository.save(order);
// После успешной обработки — commit
ack.acknowledge();
} catch (Exception e) {
System.err.println("Error processing order: " + e.getMessage());
// Не коммитим — message вернётся в очередь
}
}
}
4. Несколько Consumer экземпляров на разных машинах
Это более масштабируемый подход (horizontal scaling):
// Машина 1: Consumer 1
@Service
public class OrderConsumer1 {
@KafkaListener(
topics = "orders",
groupId = "order-processors",
clientIdPrefix = "consumer-1"
)
public void consume(OrderEvent event) {
System.out.println("Consumer 1: Processing " + event.getOrderId());
processOrder(event);
}
private void processOrder(OrderEvent event) {
// Бизнес-логика
}
}
// Машина 2: Consumer 2
@Service
public class OrderConsumer2 {
@KafkaListener(
topics = "orders",
groupId = "order-processors",
clientIdPrefix = "consumer-2"
)
public void consume(OrderEvent event) {
System.out.println("Consumer 2: Processing " + event.getOrderId());
processOrder(event);
}
private void processOrder(OrderEvent event) {
// Бизнес-логика
}
}
// Машина 3: Consumer 3
// ... аналогично
5. Оптимизация производительности
@Configuration
public class KafkaOptimizationConfig {
@Bean
public ConsumerFactory<String, OrderEvent> optimizedConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// ===== ПРОИЗВОДИТЕЛЬНОСТЬ =====
// 1. Fetch больше данных за раз
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // 1MB
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
// 2. Больше сообщений в одном poll
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
// 3. Размер буфера
configProps.put("max.partition.fetch.bytes", 10 * 1024 * 1024); // 10MB
// 4. Кэширование metadata
configProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 300000);
// 5. Отключаем автокоммит для лучшего контроля
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 6. Isolation level для transactional safety
configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(optimizedConsumerFactory());
// Количество потоков = количество партиций
factory.setConcurrency(3);
// Batch processing для оптимизации
factory.setBatchListener(true);
// Manual commit
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
6. Batch processing для еще больше throughput
@Service
public class BatchOrderProcessingService {
@Autowired
private OrderRepository orderRepository;
// Обрабатываем целый батч за раз
@KafkaListener(
topics = "orders",
groupId = "order-processors",
concurrency = "3"
)
public void processBatch(
@Payload List<OrderEvent> events,
Acknowledgment ack) {
System.out.println("Processing batch of " + events.size() + " orders");
// Батч-сохранение в БД (1 insert вместо N)
List<Order> orders = events.stream()
.map(event -> createOrder(event))
.collect(Collectors.toList());
try {
orderRepository.saveAll(orders);
ack.acknowledge();
} catch (Exception e) {
System.err.println("Batch processing error: " + e);
}
}
private Order createOrder(OrderEvent event) {
Order order = new Order();
order.setId(event.getOrderId());
order.setStatus("PROCESSED");
return order;
}
}
7. Асинхронная обработка для большей throughput
@Service
public class AsyncOrderProcessingService {
@Autowired
private OrderService orderService;
@Autowired
private TaskExecutor taskExecutor;
@KafkaListener(
topics = "orders",
groupId = "order-processors",
concurrency = "3"
)
public void processOrderAsync(
@Payload OrderEvent event,
Acknowledgment ack) {
// Коммитим сразу после получения
ack.acknowledge();
// Обработка в фоновом потоке
taskExecutor.execute(() -> {
try {
orderService.process(event);
} catch (Exception e) {
// Retry logic или DLQ
handleProcessingError(event, e);
}
});
}
private void handleProcessingError(OrderEvent event, Exception e) {
System.err.println("Failed to process: " + event);
// Отправляем в Dead Letter Queue
}
}
8. Мониторинг производительности
@Service
public class KafkaMetricsService {
@Autowired
private MeterRegistry meterRegistry;
private AtomicInteger processedCount = new AtomicInteger(0);
@KafkaListener(topics = "orders", groupId = "order-processors")
public void processWithMetrics(@Payload OrderEvent event, Acknowledgment ack) {
long startTime = System.currentTimeMillis();
try {
processOrder(event);
processedCount.incrementAndGet();
ack.acknowledge();
} finally {
long duration = System.currentTimeMillis() - startTime;
meterRegistry.timer("kafka.order.processing.time")
.record(duration, TimeUnit.MILLISECONDS);
meterRegistry.gauge("kafka.orders.processed", processedCount::get);
}
}
private void processOrder(OrderEvent event) {
// Обработка
}
}
Рекомендуемые настройки:
| Параметр | Значение | Причина |
|---|---|---|
| Concurrency | = числу партиций | Максимальный parallelism |
| MAX_POLL_RECORDS | 500-1000 | Батч обработки |
| FETCH_MIN_BYTES | 1MB | Сеть эффективнее |
| FETCH_MAX_WAIT_MS | 1000ms | Баланс latency/throughput |
| NUM_PARTITIONS | >= num consumers | Масштабируемость |
Пример увеличения производительности:
До оптимизации:
- 1 Consumer
- Throughput: 1000 msg/sec
После оптимизации:
- 3 Consumers (по числу партиций)
- Batch processing
- Оптимизированная сетевая конфигурация
- Throughput: 8000-10000 msg/sec (~10x улучшение)
Основной ключ масштабирования Kafka: количество потребителей ≤ количество партиций.