← Назад к вопросам

Как подключал Kafka к Java

2.0 Middle🔥 131 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Интеграция Kafka в Java приложение

Rasskazhu свой практический опыт подключения Kafka к Java приложениям. Рассмотрю несколько подходов — от базового KafkaProducer/KafkaConsumer до Spring Kafka.

Способ 1: Низкоуровневый API (KafkaProducer/KafkaConsumer)

Добавляем зависимость

<!-- pom.xml -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

Создаём Producer

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

public class OrderProducer {
    private final KafkaProducer<String, String> producer;

    public OrderProducer(String brokers) {
        Properties props = new Properties();
        
        // Критические настройки
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // Надежность доставки
        props.put(ProducerConfig.ACKS_CONFIG, "all");  // all = записано во все replicas
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);
        
        // Батчинг для производительности
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
        // Идемпотентность (гарантия "exactly once")
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        this.producer = new KafkaProducer<>(props);
    }

    public void publishOrder(String orderId, String orderData) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "orders-topic",      // topic
            orderId,              // key (для партиционирования)
            orderData             // value
        );

        // Асинхронная отправка с callback
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    // Логируем ошибку, отправляем в DLQ
                    handleSendFailure(orderId, exception);
                } else {
                    System.out.println(String.format(
                        "Order %s sent to partition %d with offset %d",
                        orderId, metadata.partition(), metadata.offset()
                    ));
                }
            }
        });
    }

    public void publishOrderSync(String orderId, String orderData) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "orders-topic", orderId, orderData
        );
        
        // Синхронная отправка (блокирует до подтверждения)
        RecordMetadata metadata = producer.send(record).get();
        System.out.println("Order published to offset " + metadata.offset());
    }

    public void close() {
        producer.close();
    }

    private void handleSendFailure(String orderId, Exception ex) {
        // Отправляем в Dead Letter Queue
        System.err.println("Failed to send order " + orderId + ": " + ex.getMessage());
        // В production: сохранить в БД, отправить alert
    }
}

Создаём Consumer

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

public class OrderConsumer {
    private final KafkaConsumer<String, String> consumer;
    private volatile boolean running = true;

    public OrderConsumer(String brokers, String groupId) {
        Properties props = new Properties();
        
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        // Управление смещением (offset)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // с начала при отсутствии offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // ручная коммитизация
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        
        this.consumer = new KafkaConsumer<>(props);
    }

    public void subscribe() {
        consumer.subscribe(Collections.singletonList("orders-topic"));
    }

    public void consume(OrderProcessor processor) {
        try {
            while (running) {
                // Опрашиваем сообщения
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // Обработка сообщения
                        System.out.println(String.format(
                            "Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s",
                            record.topic(), record.partition(), record.offset(),
                            record.key(), record.value()
                        ));

                        processor.process(record.key(), record.value());

                        // Коммитим offset после успешной обработки
                        consumer.commitAsync((offsets, exception) -> {
                            if (exception != null) {
                                System.err.println("Failed to commit offset: " + exception);
                            }
                        });
                    } catch (Exception e) {
                        System.err.println("Error processing message: " + e.getMessage());
                        // Не коммитим — перепроцессируем при перезагрузке
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }

    public void stop() {
        running = false;
    }
}

public interface OrderProcessor {
    void process(String orderId, String orderData) throws Exception;
}

Способ 2: Spring Kafka (рекомендуется)

Добавляем зависимость

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.0</version>
</dependency>

Конфигурация

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.context.annotation.Bean;

@Configuration
@EnableKafka
public class KafkaConfig {
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        return new DefaultProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        return new DefaultConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);  // 3 потока для обработки
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

Producer в Spring

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderKafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderKafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrder(String orderId, String orderJson) {
        Message<String> message = MessageBuilder
            .withPayload(orderJson)
            .setHeader(KafkaHeaders.TOPIC, "orders-topic")
            .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
            .setHeader("timestamp", System.currentTimeMillis())
            .build();

        kafkaTemplate.send(message)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    System.out.println("Order sent: " + 
                        result.getRecordMetadata().offset());
                } else {
                    System.err.println("Failed to send: " + ex.getMessage());
                }
            });
    }
}

Consumer в Spring

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.rebalance.ConsumerAwareRebalanceListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class OrderKafkaConsumer {
    
    @KafkaListener(
        topics = "orders-topic",
        groupId = "order-service",
        concurrency = "3"
    )
    public void consumeOrder(
            @Payload String orderJson,
            @Header(KafkaHeaders.RECEIVED_KEY) String orderId,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment acknowledgment) {
        
        try {
            // Обработка
            System.out.println("Processing order: " + orderId);
            processOrder(orderJson);
            
            // Коммитим вручную при успехе
            acknowledgment.acknowledge();
        } catch (Exception e) {
            System.err.println("Error: " + e.getMessage());
            // Не коммитим — повтор при перезагрузке
        }
    }

    private void processOrder(String orderJson) {
        // Логика обработки
    }
}

Способ 3: JSON сериализация

// При работе с сложными объектами
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaJsonConfig {
    
    @Bean
    public ProducerFactory<String, Order> jsonProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        
        return new DefaultProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Order> jsonKafkaTemplate() {
        return new KafkaTemplate<>(jsonProducerFactory());
    }
}

// Consumer с JSON
public class Order {
    private String orderId;
    private String customerId;
    private BigDecimal amount;
    // getters, setters
}

@Service
public class OrderJsonConsumer {
    
    @KafkaListener(
        topics = "orders-json",
        groupId = "order-service"
    )
    public void consumeOrderJson(Order order) {
        System.out.println("Received order: " + order.getOrderId());
    }
}

Практические советы из опыта

1. Обработка ошибок

@Service
public class RobustOrderConsumer {
    
    @KafkaListener(
        topics = "orders-topic",
        groupId = "order-service"
    )
    public void consume(Order order, Acknowledgment ack) {
        try {
            processOrder(order);
            ack.acknowledge();
        } catch (RetryableException e) {
            // Бросим исключение, Consumer будет переотправлен
            throw new RuntimeException(e);
        } catch (FatalException e) {
            // Логируем, отправляем в DLQ, подтверждаем
            logger.error("Fatal error", e);
            sendToDeadLetterQueue(order);
            ack.acknowledge();
        }
    }
}

2. Dead Letter Topic

@Configuration
public class DeadLetterQueueConfig {
    
    @Bean
    public NewTopic deadLetterTopic() {
        return TopicBuilder.name("orders-topic.DLT")
            .partitions(1)
            .replicas(3)
            .build();
    }
}

3. Мониторинг

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      max-poll-records: 100
    producer:
      acks: all
management:
  endpoints:
    web:
      exposure:
        include: health,kafka

Lessons Learned

  1. Всегда используй manual commit — AUTO_COMMIT может потерять сообщения при перезагрузке
  2. Идемпотентность критична — ENABLE_IDEMPOTENCE_CONFIG = true защищает от дублей
  3. Партиционируй по бизнес-ключу — используй key для гарантии порядка внутри партиции
  4. Dead Letter Queue — обязателен — для неудачных сообщений
  5. Монитор lag — отстаток Consumer от Producer показывает здоровье системы
  6. Версионируй сообщения — добавляй version field для совместимости при эволюции
  7. Сжимай данные — COMPRESSION_TYPE = "snappy" экономит bandwidth

Это мой практический опыт работы с Kafka в Java. Spring Kafka — лучший выбор для production систем, так как управляет многими деталями автоматически.