← Назад к вопросам

Как увеличить производительность обработки Topic в Kafka работой с очередью двух Consumer

2.8 Senior🔥 231 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Масштабирование обработки Kafka Topic с несколькими Consumer

Использование нескольких потребителей в одной Consumer Group — стандартный способ увеличить throughput обработки сообщений.

1. Базовая архитектура

Topic: orders (3 партиции)
├── Partition 0
├── Partition 1
└── Partition 2

Consumer Group: order-processors
├── Consumer 1 (обрабатывает Partition 0)
├── Consumer 2 (обрабатывает Partition 1)
└── Consumer 3 (обрабатывает Partition 2)

Каждый consumer получает разные партиции, обработка идёт параллельно.

2. Конфигурация Consumer (Spring Kafka)

@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            JsonDeserializer.class);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // Оптимизация производительности
        configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // 100KB
        configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Больше сообщений за раз
        configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> 
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Важно: количество потоков = числу партиций
        factory.setConcurrency(3); // 3 потока для 3 партиций
        
        // Manual commit для большего контроля
        factory.getContainerProperties().setAckMode(
            ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        return factory;
    }
}

3. Consumer сервис (на одной машине, 3 потока)

@Service
public class OrderProcessingService {
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private Acknowledgment acknowledgment;
    
    // Один listener обрабатывает messages из разных партиций
    // благодаря concurrency=3
    @KafkaListener(
        topics = "orders",
        groupId = "order-processors",
        concurrency = "3" // 3 потока
    )
    public void processOrder(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            Acknowledgment ack) {
        
        System.out.println("Processing order from partition: " + partition);
        
        try {
            // Обработка заказа
            Order order = new Order();
            order.setId(event.getOrderId());
            order.setStatus("PROCESSING");
            orderRepository.save(order);
            
            // После успешной обработки — commit
            ack.acknowledge();
        } catch (Exception e) {
            System.err.println("Error processing order: " + e.getMessage());
            // Не коммитим — message вернётся в очередь
        }
    }
}

4. Несколько Consumer экземпляров на разных машинах

Это более масштабируемый подход (horizontal scaling):

// Машина 1: Consumer 1
@Service
public class OrderConsumer1 {
    @KafkaListener(
        topics = "orders",
        groupId = "order-processors",
        clientIdPrefix = "consumer-1"
    )
    public void consume(OrderEvent event) {
        System.out.println("Consumer 1: Processing " + event.getOrderId());
        processOrder(event);
    }
    
    private void processOrder(OrderEvent event) {
        // Бизнес-логика
    }
}

// Машина 2: Consumer 2
@Service
public class OrderConsumer2 {
    @KafkaListener(
        topics = "orders",
        groupId = "order-processors",
        clientIdPrefix = "consumer-2"
    )
    public void consume(OrderEvent event) {
        System.out.println("Consumer 2: Processing " + event.getOrderId());
        processOrder(event);
    }
    
    private void processOrder(OrderEvent event) {
        // Бизнес-логика
    }
}

// Машина 3: Consumer 3
// ... аналогично

5. Оптимизация производительности

@Configuration
public class KafkaOptimizationConfig {
    @Bean
    public ConsumerFactory<String, OrderEvent> optimizedConsumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            JsonDeserializer.class);
        
        // ===== ПРОИЗВОДИТЕЛЬНОСТЬ =====
        // 1. Fetch больше данных за раз
        configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // 1MB
        configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
        
        // 2. Больше сообщений в одном poll
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        
        // 3. Размер буфера
        configProps.put("max.partition.fetch.bytes", 10 * 1024 * 1024); // 10MB
        
        // 4. Кэширование metadata
        configProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 300000);
        
        // 5. Отключаем автокоммит для лучшего контроля
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // 6. Isolation level для transactional safety
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> 
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(optimizedConsumerFactory());
        
        // Количество потоков = количество партиций
        factory.setConcurrency(3);
        
        // Batch processing для оптимизации
        factory.setBatchListener(true);
        
        // Manual commit
        factory.getContainerProperties().setAckMode(
            ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        return factory;
    }
}

6. Batch processing для еще больше throughput

@Service
public class BatchOrderProcessingService {
    @Autowired
    private OrderRepository orderRepository;
    
    // Обрабатываем целый батч за раз
    @KafkaListener(
        topics = "orders",
        groupId = "order-processors",
        concurrency = "3"
    )
    public void processBatch(
            @Payload List<OrderEvent> events,
            Acknowledgment ack) {
        
        System.out.println("Processing batch of " + events.size() + " orders");
        
        // Батч-сохранение в БД (1 insert вместо N)
        List<Order> orders = events.stream()
            .map(event -> createOrder(event))
            .collect(Collectors.toList());
        
        try {
            orderRepository.saveAll(orders);
            ack.acknowledge();
        } catch (Exception e) {
            System.err.println("Batch processing error: " + e);
        }
    }
    
    private Order createOrder(OrderEvent event) {
        Order order = new Order();
        order.setId(event.getOrderId());
        order.setStatus("PROCESSED");
        return order;
    }
}

7. Асинхронная обработка для большей throughput

@Service
public class AsyncOrderProcessingService {
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private TaskExecutor taskExecutor;
    
    @KafkaListener(
        topics = "orders",
        groupId = "order-processors",
        concurrency = "3"
    )
    public void processOrderAsync(
            @Payload OrderEvent event,
            Acknowledgment ack) {
        
        // Коммитим сразу после получения
        ack.acknowledge();
        
        // Обработка в фоновом потоке
        taskExecutor.execute(() -> {
            try {
                orderService.process(event);
            } catch (Exception e) {
                // Retry logic или DLQ
                handleProcessingError(event, e);
            }
        });
    }
    
    private void handleProcessingError(OrderEvent event, Exception e) {
        System.err.println("Failed to process: " + event);
        // Отправляем в Dead Letter Queue
    }
}

8. Мониторинг производительности

@Service
public class KafkaMetricsService {
    @Autowired
    private MeterRegistry meterRegistry;
    
    private AtomicInteger processedCount = new AtomicInteger(0);
    
    @KafkaListener(topics = "orders", groupId = "order-processors")
    public void processWithMetrics(@Payload OrderEvent event, Acknowledgment ack) {
        long startTime = System.currentTimeMillis();
        
        try {
            processOrder(event);
            processedCount.incrementAndGet();
            ack.acknowledge();
        } finally {
            long duration = System.currentTimeMillis() - startTime;
            meterRegistry.timer("kafka.order.processing.time")
                .record(duration, TimeUnit.MILLISECONDS);
            meterRegistry.gauge("kafka.orders.processed", processedCount::get);
        }
    }
    
    private void processOrder(OrderEvent event) {
        // Обработка
    }
}

Рекомендуемые настройки:

ПараметрЗначениеПричина
Concurrency= числу партицийМаксимальный parallelism
MAX_POLL_RECORDS500-1000Батч обработки
FETCH_MIN_BYTES1MBСеть эффективнее
FETCH_MAX_WAIT_MS1000msБаланс latency/throughput
NUM_PARTITIONS>= num consumersМасштабируемость

Пример увеличения производительности:

До оптимизации:

  • 1 Consumer
  • Throughput: 1000 msg/sec

После оптимизации:

  • 3 Consumers (по числу партиций)
  • Batch processing
  • Оптимизированная сетевая конфигурация
  • Throughput: 8000-10000 msg/sec (~10x улучшение)

Основной ключ масштабирования Kafka: количество потребителей ≤ количество партиций.

Как увеличить производительность обработки Topic в Kafka работой с очередью двух Consumer | PrepBro