← Назад к вопросам

Как управлять приоритетами с помощью Kafka

2.3 Middle🔥 131 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как управлять приоритетами с помощью Kafka

Kafka сам по себе не имеет встроенного механизма приоритетов для сообщений в очереди, но существуют несколько стратегий для эффективного управления приоритетами в системах на базе Kafka.

Подход 1: Несколько тематических разделов (Topics) по приоритетам

Это наиболее распространённый и рекомендуемый подход:

@Configuration
public class KafkaProducerConfig {
    public static final String TOPIC_HIGH_PRIORITY = "events-high-priority";
    public static final String TOPIC_NORMAL_PRIORITY = "events-normal-priority";
    public static final String TOPIC_LOW_PRIORITY = "events-low-priority";
    
    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Event> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

@Service
public class EventProducer {
    @Autowired
    private KafkaTemplate<String, Event> kafkaTemplate;
    
    public void sendEvent(Event event) {
        String topic = determineTopic(event.getPriority());
        kafkaTemplate.send(topic, event.getId(), event);
    }
    
    private String determineTopic(Priority priority) {
        return switch(priority) {
            case HIGH -> KafkaProducerConfig.TOPIC_HIGH_PRIORITY;
            case NORMAL -> KafkaProducerConfig.TOPIC_NORMAL_PRIORITY;
            case LOW -> KafkaProducerConfig.TOPIC_LOW_PRIORITY;
        };
    }
}

Consumer с обработкой приоритетов:

@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, Event> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "priority-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultConsumerFactory<>(props);
    }
}

@Service
public class PriorityEventConsumer {
    private static final Logger logger = LoggerFactory.getLogger(PriorityEventConsumer.class);
    
    // Слушатели с разными приоритетами
    @KafkaListener(topics = KafkaProducerConfig.TOPIC_HIGH_PRIORITY,
                   groupId = "high-priority-group",
                   concurrency = "5")  // Больше параллельных обработчиков
    public void consumeHighPriority(Event event) {
        logger.info("Processing HIGH priority event: {}", event.getId());
        processEvent(event, Priority.HIGH);
    }
    
    @KafkaListener(topics = KafkaProducerConfig.TOPIC_NORMAL_PRIORITY,
                   groupId = "normal-priority-group",
                   concurrency = "3")
    public void consumeNormalPriority(Event event) {
        logger.info("Processing NORMAL priority event: {}", event.getId());
        processEvent(event, Priority.NORMAL);
    }
    
    @KafkaListener(topics = KafkaProducerConfig.TOPIC_LOW_PRIORITY,
                   groupId = "low-priority-group",
                   concurrency = "1")
    public void consumeLowPriority(Event event) {
        logger.info("Processing LOW priority event: {}", event.getId());
        processEvent(event, Priority.LOW);
    }
    
    private void processEvent(Event event, Priority priority) {
        // Обработка события
    }
}

Преимущества:

  • Простота реализации
  • Возможность разных параллелизмов для разных приоритетов
  • Хорошая масштабируемость
  • Изоляция потоков обработки

Недостатки:

  • Нужно создавать несколько topics
  • Усложнение архитектуры

Подход 2: Приоритет в заголовке сообщения (одного topic)

Для ситуаций, когда нужен один topic, но с разными приоритетами:

public class PriorityEvent {
    private String id;
    private String payload;
    private Priority priority;
    private LocalDateTime timestamp;
}

public enum Priority {
    HIGH(1),
    NORMAL(2),
    LOW(3);
    
    private final int level;
    Priority(int level) { this.level = level; }
    public int getLevel() { return level; }
}

@Service
public class PriorityAwareConsumer {
    private final PriorityQueue<PriorityEvent> eventQueue = 
        new PriorityQueue<>(Comparator.comparingInt(e -> e.getPriority().getLevel()));
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    @KafkaListener(topics = "events-priority", groupId = "priority-group")
    public void consumeEvent(PriorityEvent event) {
        eventQueue.offer(event);
        processQueue();
    }
    
    private void processQueue() {
        while (!eventQueue.isEmpty()) {
            PriorityEvent event = eventQueue.poll();
            executorService.submit(() -> {
                try {
                    handleEvent(event);
                } catch (Exception e) {
                    logger.error("Error processing event", e);
                }
            });
        }
    }
    
    private void handleEvent(PriorityEvent event) {
        // Обработка события
    }
}

Подход 3: Использование партиций и весов обработки

Распределение по партициям в зависимости от приоритета:

@Service
public class PartitionBasedProducer {
    @Autowired
    private KafkaTemplate<String, PriorityEvent> kafkaTemplate;
    
    public void sendEvent(PriorityEvent event) {
        // Определяем партицию на основе приоритета
        // Partition 0 - HIGH priority
        // Partition 1 - NORMAL priority
        // Partition 2 - LOW priority
        
        int partition = event.getPriority().getLevel() - 1;
        kafkaTemplate.send(new ProducerRecord<>(
            "events-partitioned",
            partition,
            event.getId(),
            event
        ));
    }
}

@Service
public class PartitionAwareConsumer {
    // Читаем разные партиции с разными параллелизмами
    @KafkaListener(
        topics = "events-partitioned",
        groupId = "partition-group",
        topicPartitions = {
            @TopicPartition(topic = "events-partitioned", partitions = "0")
        },
        concurrency = "5"  // HIGH priority
    )
    public void consumeHighPriority(PriorityEvent event) {
        handleEvent(event);
    }
    
    @KafkaListener(
        topics = "events-partitioned",
        groupId = "partition-group",
        topicPartitions = {
            @TopicPartition(topic = "events-partitioned", partitions = "1")
        },
        concurrency = "3"  // NORMAL priority
    )
    public void consumeNormalPriority(PriorityEvent event) {
        handleEvent(event);
    }
    
    @KafkaListener(
        topics = "events-partitioned",
        groupId = "partition-group",
        topicPartitions = {
            @TopicPartition(topic = "events-partitioned", partitions = "2")
        },
        concurrency = "1"  // LOW priority
    )
    public void consumeLowPriority(PriorityEvent event) {
        handleEvent(event);
    }
    
    private void handleEvent(PriorityEvent event) {
        // Обработка события
    }
}

Подход 4: Взвешенная обработка с несколькими потребителями

Реализация приоритета через количество потребителей:

@Configuration
public class PriorityConsumerConfig {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, PriorityEvent>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, PriorityEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

@Service
public class WeightedPriorityConsumer {
    private final int WEIGHT_HIGH = 5;    // 5 потребителей
    private final int WEIGHT_NORMAL = 3;  // 3 потребителя
    private final int WEIGHT_LOW = 1;     // 1 потребитель
    
    @KafkaListener(topics = "events-priority",
                   groupId = "weighted-group",
                   concurrency = "9")  // WEIGHT_HIGH + WEIGHT_NORMAL + WEIGHT_LOW
    public void consumeWeightedPriority(PriorityEvent event) {
        // Kafka distribuирует нагрузку на основе количества потребителей
        handleEvent(event);
    }
    
    private void handleEvent(PriorityEvent event) {
        // Обработка события
    }
}

Лучшие практики управления приоритетами в Kafka

1. SLA и таймауты:

@Service
public class SLAEnforcer {
    private static final long HIGH_PRIORITY_TIMEOUT_MS = 1000;   // 1 сек
    private static final long NORMAL_PRIORITY_TIMEOUT_MS = 5000;  // 5 сек
    private static final long LOW_PRIORITY_TIMEOUT_MS = 60000;    // 1 мин
    
    public void processEvent(PriorityEvent event) {
        long timeout = switch(event.getPriority()) {
            case HIGH -> HIGH_PRIORITY_TIMEOUT_MS;
            case NORMAL -> NORMAL_PRIORITY_TIMEOUT_MS;
            case LOW -> LOW_PRIORITY_TIMEOUT_MS;
        };
        
        long startTime = System.currentTimeMillis();
        try {
            handleEvent(event);
        } finally {
            long elapsed = System.currentTimeMillis() - startTime;
            if (elapsed > timeout) {
                logger.warn("Event {} exceeded SLA timeout", event.getId());
            }
        }
    }
}

2. Мониторинг и метрики:

@Service
public class PriorityMetrics {
    private final MeterRegistry meterRegistry;
    
    public void recordEventProcessing(PriorityEvent event, long durationMs) {
        Counter.builder("events.processed")
            .tag("priority", event.getPriority().name())
            .register(meterRegistry)
            .increment();
        
        Timer.builder("events.processing.time")
            .tag("priority", event.getPriority().name())
            .register(meterRegistry)
            .record(Duration.ofMillis(durationMs));
    }
}

Итог: Kafka не имеет встроенных приоритетов, но можно эффективно управлять ими через:

  • Несколько topics по приоритетам (рекомендуется)
  • Партиции с разными concurrency
  • Встроенные priority queues в консьюмере
  • Взвешенное распределение потребителей

Выбор подхода зависит от требований к задержкам и масштабируемости системы.