← Назад к вопросам

Хорошо ли знаешь Kafka

1.0 Junior🔥 191 комментариев
#Docker, Kubernetes и DevOps#JVM и управление памятью#ORM и Hibernate

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Хорошо знаю Apache Kafka, работал в системах с высоконагруженным потоком событий

Основные концепции Kafka

Apache Kafka - это распределённая система потоковой обработки данных. Она выходит далеко за рамки обычного message broker, предоставляя надёжную доставку событий в масштабе миллионов сообщений в секунду.

Ключевые компоненты:

  • Topics - логический канал для отправки событий
  • Partitions - физическое разделение для масштабируемости
  • Consumer Groups - группы подписчиков для параллельной обработки
  • Offsets - позиции чтения в partition
  • Replicas - копии данных для надёжности

Архитектура и мой опыт

// Producer - отправка событий
public class OrderEventProducer {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
    private final ObjectMapper objectMapper;
    
    public OrderEventProducer(KafkaTemplate<String, OrderEvent> kafkaTemplate,
                             ObjectMapper objectMapper) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
    }
    
    public void publishOrderCreated(Order order) {
        OrderEvent event = OrderEvent.builder()
                .eventType("ORDER_CREATED")
                .orderId(order.getId())
                .timestamp(System.currentTimeMillis())
                .payload(order)
                .build();
        
        // Отправка с ключом для партиционирования
        kafkaTemplate.send("orders-topic", order.getId().toString(), event)
                .addCallback(
                    result -> log.info("Event sent: {}", result.getRecordMetadata().partition()),
                    ex -> log.error("Failed to send event", ex)
                );
    }
}

// Consumer - обработка событий
@Service
public class OrderEventConsumer {
    
    @KafkaListener(topics = "orders-topic", groupId = "order-processing-group")
    public void handleOrderEvent(@Payload OrderEvent event,
                                @Headers Map<String, Object> headers,
                                Acknowledgment ack) {
        try {
            log.info("Processing order event: {}", event.getOrderId());
            processOrder(event);
            // Manual commit после успешной обработки
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Error processing order event", e);
            // Сообщение останется в очереди для retry
        }
    }
}

Глубокое понимание Kafka

Partitioning Strategy:

// Использование ключа для контроля распределения сообщений
public class PartitioningExample {
    
    public void sendWithPartitionKey(String userId, OrderEvent event) {
        // Все события одного пользователя будут в одной partition
        // Гарантирует порядок обработки для одного пользователя
        kafkaTemplate.send("orders-topic", userId, event);
    }
    
    public void sendWithoutKey(OrderEvent event) {
        // Round-robin распределение между партициями
        kafkaTemplate.send("orders-topic", event);
    }
    
    // Custom partitioner для сложной логики
    public static class CustomPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                           Object value, byte[] valueBytes,
                           Cluster cluster) {
            if (key == null) {
                return 0;
            }
            String keyStr = (String) key;
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            
            // Распределение по регионам
            if (keyStr.startsWith("US")) {
                return 0;
            } else if (keyStr.startsWith("EU")) {
                return 1;
            } else {
                return 2 % numPartitions;
            }
        }
    }
}

Consumer Groups и Parallel Processing

// Масштабирование через consumer groups
@Configuration
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderEvent.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // Начать с начала
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  // Батч обработки
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);  // Timeout сессии
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        factory.setConcurrency(5);  // 5 параллельных потоков обработки
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

Обработка ошибок и Retry Logic

@Service
public class RobustOrderProcessor {
    
    private static final int MAX_RETRIES = 3;
    private static final long BACKOFF_DELAY = 1000;  // 1 секунда
    
    @KafkaListener(topics = "orders-topic", groupId = "order-group")
    public void processWithRetry(@Payload OrderEvent event,
                                Acknowledgment ack) {
        int retries = 0;
        while (retries < MAX_RETRIES) {
            try {
                processOrderWithValidation(event);
                ack.acknowledge();
                return;
            } catch (TemporaryException e) {
                retries++;
                if (retries < MAX_RETRIES) {
                    try {
                        Thread.sleep(BACKOFF_DELAY * retries);  // Экспоненциальная задержка
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    log.error("Max retries exceeded for event: {}", event.getOrderId(), e);
                    // Отправить в DLQ
                    sendToDeadLetterQueue(event, e);
                }
            } catch (PermanentException e) {
                log.error("Permanent error, not retrying", e);
                sendToDeadLetterQueue(event, e);
                ack.acknowledge();
                return;
            }
        }
    }
    
    private void sendToDeadLetterQueue(OrderEvent event, Exception e) {
        kafkaTemplate.send("orders-dlq", event.getOrderId().toString(), 
                          DeadLetterRecord.from(event, e));
    }
}

Offset Management

// Управление смещениями для корректной обработки
@Service
public class OffsetManagement {
    
    @KafkaListener(topics = "orders-topic", groupId = "exact-once-group")
    public void processWithExactlyOnce(@Payload OrderEvent event,
                                       Consumer<?, ?> consumer,
                                       Acknowledgment ack) {
        try {
            // Помечаем как обработанное до фактической обработки
            // (идемпотентная обработка)
            if (isAlreadyProcessed(event.getEventId())) {
                log.info("Event already processed: {}", event.getEventId());
                ack.acknowledge();
                return;
            }
            
            processOrder(event);
            markAsProcessed(event.getEventId());
            ack.acknowledge();
        } catch (Exception e) {
            // Не подтверждаем offset, сообщение будет переобработано
            log.error("Error processing event", e);
            throw new RuntimeException(e);
        }
    }
    
    // Идемпотентность через кэширование обработанных ID
    private boolean isAlreadyProcessed(String eventId) {
        return processedEvents.containsKey(eventId);
    }
    
    private void markAsProcessed(String eventId) {
        processedEvents.put(eventId, System.currentTimeMillis());
    }
}

Мониторинг и Performance Tuning

// Метрики для мониторинга
@Configuration
public class KafkaMetricsConfig {
    
    @Bean
    public MeterRegistry meterRegistry() {
        return new SimpleMeterRegistry();
    }
    
    @Service
    public class KafkaMetricsCollector {
        
        @Autowired
        private MeterRegistry meterRegistry;
        
        public void recordProcessingLatency(long latencyMs) {
            Timer.builder("kafka.processing.latency")
                    .publishPercentiles(0.5, 0.95, 0.99)
                    .record(latencyMs, TimeUnit.MILLISECONDS);
        }
        
        public void recordProcessingError(String topic) {
            Counter.builder("kafka.processing.errors")
                    .tag("topic", topic)
                    .register(meterRegistry)
                    .increment();
        }
    }
}

Лучшие практики

  1. Партиционирование - используй ключи для гарантирования порядка
  2. Идемпотентность - предполагай возможность обработки дважды
  3. Мониторинг lag - отслеживай отставание consumer от producer
  4. Graceful shutdown - правильно закрывай consumer при остановке
  5. Schema versioning - версионируй формат сообщений
  6. Circuit breaker - предотвращай cascade failures

Знание Production Issues

  • Rebalancing - как минимизировать STW (stop-the-world) при добавлении consumers
  • Exactly-once semantics - как гарантировать обработку каждого сообщения ровно один раз
  • Large messages - правильная настройка max.message.bytes
  • Retention policy - балансирование между хранилищем и восстанавливаемостью
  • Network failures - timeout и retry настройки

Я использовал Kafka в системах, обрабатывающих миллионы событий в день, и имею глубокое понимание её архитектуры, нюансов и best practices.