Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Интеграция Kafka в Java приложение
Rasskazhu свой практический опыт подключения Kafka к Java приложениям. Рассмотрю несколько подходов — от базового KafkaProducer/KafkaConsumer до Spring Kafka.
Способ 1: Низкоуровневый API (KafkaProducer/KafkaConsumer)
Добавляем зависимость
<!-- pom.xml -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
Создаём Producer
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
public class OrderProducer {
private final KafkaProducer<String, String> producer;
public OrderProducer(String brokers) {
Properties props = new Properties();
// Критические настройки
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Надежность доставки
props.put(ProducerConfig.ACKS_CONFIG, "all"); // all = записано во все replicas
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);
// Батчинг для производительности
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Идемпотентность (гарантия "exactly once")
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.producer = new KafkaProducer<>(props);
}
public void publishOrder(String orderId, String orderData) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders-topic", // topic
orderId, // key (для партиционирования)
orderData // value
);
// Асинхронная отправка с callback
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// Логируем ошибку, отправляем в DLQ
handleSendFailure(orderId, exception);
} else {
System.out.println(String.format(
"Order %s sent to partition %d with offset %d",
orderId, metadata.partition(), metadata.offset()
));
}
}
});
}
public void publishOrderSync(String orderId, String orderData) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders-topic", orderId, orderData
);
// Синхронная отправка (блокирует до подтверждения)
RecordMetadata metadata = producer.send(record).get();
System.out.println("Order published to offset " + metadata.offset());
}
public void close() {
producer.close();
}
private void handleSendFailure(String orderId, Exception ex) {
// Отправляем в Dead Letter Queue
System.err.println("Failed to send order " + orderId + ": " + ex.getMessage());
// В production: сохранить в БД, отправить alert
}
}
Создаём Consumer
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
public class OrderConsumer {
private final KafkaConsumer<String, String> consumer;
private volatile boolean running = true;
public OrderConsumer(String brokers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Управление смещением (offset)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // с начала при отсутствии offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // ручная коммитизация
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
this.consumer = new KafkaConsumer<>(props);
}
public void subscribe() {
consumer.subscribe(Collections.singletonList("orders-topic"));
}
public void consume(OrderProcessor processor) {
try {
while (running) {
// Опрашиваем сообщения
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
// Обработка сообщения
System.out.println(String.format(
"Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value()
));
processor.process(record.key(), record.value());
// Коммитим offset после успешной обработки
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Failed to commit offset: " + exception);
}
});
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// Не коммитим — перепроцессируем при перезагрузке
}
}
}
} finally {
consumer.close();
}
}
public void stop() {
running = false;
}
}
public interface OrderProcessor {
void process(String orderId, String orderData) throws Exception;
}
Способ 2: Spring Kafka (рекомендуется)
Добавляем зависимость
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.0</version>
</dependency>
Конфигурация
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.context.annotation.Bean;
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 3 потока для обработки
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
Producer в Spring
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class OrderKafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderKafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendOrder(String orderId, String orderJson) {
Message<String> message = MessageBuilder
.withPayload(orderJson)
.setHeader(KafkaHeaders.TOPIC, "orders-topic")
.setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
.setHeader("timestamp", System.currentTimeMillis())
.build();
kafkaTemplate.send(message)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Order sent: " +
result.getRecordMetadata().offset());
} else {
System.err.println("Failed to send: " + ex.getMessage());
}
});
}
}
Consumer в Spring
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.rebalance.ConsumerAwareRebalanceListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class OrderKafkaConsumer {
@KafkaListener(
topics = "orders-topic",
groupId = "order-service",
concurrency = "3"
)
public void consumeOrder(
@Payload String orderJson,
@Header(KafkaHeaders.RECEIVED_KEY) String orderId,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
try {
// Обработка
System.out.println("Processing order: " + orderId);
processOrder(orderJson);
// Коммитим вручную при успехе
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
// Не коммитим — повтор при перезагрузке
}
}
private void processOrder(String orderJson) {
// Логика обработки
}
}
Способ 3: JSON сериализация
// При работе с сложными объектами
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaJsonConfig {
@Bean
public ProducerFactory<String, Order> jsonProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Order> jsonKafkaTemplate() {
return new KafkaTemplate<>(jsonProducerFactory());
}
}
// Consumer с JSON
public class Order {
private String orderId;
private String customerId;
private BigDecimal amount;
// getters, setters
}
@Service
public class OrderJsonConsumer {
@KafkaListener(
topics = "orders-json",
groupId = "order-service"
)
public void consumeOrderJson(Order order) {
System.out.println("Received order: " + order.getOrderId());
}
}
Практические советы из опыта
1. Обработка ошибок
@Service
public class RobustOrderConsumer {
@KafkaListener(
topics = "orders-topic",
groupId = "order-service"
)
public void consume(Order order, Acknowledgment ack) {
try {
processOrder(order);
ack.acknowledge();
} catch (RetryableException e) {
// Бросим исключение, Consumer будет переотправлен
throw new RuntimeException(e);
} catch (FatalException e) {
// Логируем, отправляем в DLQ, подтверждаем
logger.error("Fatal error", e);
sendToDeadLetterQueue(order);
ack.acknowledge();
}
}
}
2. Dead Letter Topic
@Configuration
public class DeadLetterQueueConfig {
@Bean
public NewTopic deadLetterTopic() {
return TopicBuilder.name("orders-topic.DLT")
.partitions(1)
.replicas(3)
.build();
}
}
3. Мониторинг
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
max-poll-records: 100
producer:
acks: all
management:
endpoints:
web:
exposure:
include: health,kafka
Lessons Learned
- Всегда используй manual commit — AUTO_COMMIT может потерять сообщения при перезагрузке
- Идемпотентность критична — ENABLE_IDEMPOTENCE_CONFIG = true защищает от дублей
- Партиционируй по бизнес-ключу — используй key для гарантии порядка внутри партиции
- Dead Letter Queue — обязателен — для неудачных сообщений
- Монитор lag — отстаток Consumer от Producer показывает здоровье системы
- Версионируй сообщения — добавляй version field для совместимости при эволюции
- Сжимай данные — COMPRESSION_TYPE = "snappy" экономит bandwidth
Это мой практический опыт работы с Kafka в Java. Spring Kafka — лучший выбор для production систем, так как управляет многими деталями автоматически.