Как гарантировано отправить сообщения между сервисами с помощью Apache Kafka
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Гарантированная доставка сообщений между сервисами с Apache Kafka
Apache Kafka — это распределённая система очередей, которая обеспечивает высокую надёжность доставки сообщений. Вот основные механизмы для гарантированной доставки:
1. Acks (Подтверждения)
Ключ к надёжности — правильная конфигурация параметра acks на стороне producer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // или -1 (ждём всех replicas)
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 1);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Уровни acks:
acks=0— не ждём ничего (быстро, но не надёжно)acks=1— ждём leader (риск потери при crash leader)acks=all(-1) — ждём все replicas (гарантия доставки, но медленнее)
2. Конфигурация Producer
props.put("retries", Integer.MAX_VALUE); // бесконечные retry
props.put("max.in.flight.requests.per.connection", 1); // порядок сообщений
props.put("enable.idempotence", true); // дедупликация (Kafka >= 0.11)
props.put("delivery.timeout.ms", 120000); // timeout на доставку
enable.idempotence автоматически устанавливает retries=Integer.MAX_VALUE и гарантирует, что сообщение отправляется только один раз.
3. Конфигурация Consumer
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false); // manual commit
props.put("isolation.level", "read_committed"); // только committed
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record.value());
}
consumer.commitSync(); // commit после обработки
}
4. Replication Factor
На уровне топика убедитесь, что установлен min.insync.replicas >= 2:
kafka-topics --create --topic my-topic --replication-factor 3 --partitions 3 --bootstrap-server localhost:9092
При acks=all + min.insync.replicas=2, сообщение считается доставленным, когда записано на leader и минимум одну replica.
5. Идемпотентность
Для true at-least-once с предотвращением дубликатов используйте транзакции Kafka:
props.put("transactional.id", "my-producer-" + UUID.randomUUID());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "key", "value"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
6. Паттерн Outbox
Для гарантии доставки между микросервисами, когда нужно атомарно сохранить данные в БД и отправить событие:
// 1. Сохраняем в БД + Outbox таблицу в одной транзакции
@Transactional
public void processOrder(Order order) {
orderRepository.save(order);
outboxRepository.save(new OutboxEvent(
"order.created",
order.getId(),
LocalDateTime.now()
));
}
// 2. Отдельный сервис читает Outbox и отправляет в Kafka
@Scheduled(fixedRate = 1000)
public void publishOutboxEvents() {
List<OutboxEvent> events = outboxRepository.findUnpublished();
for (OutboxEvent event : events) {
producer.send(new ProducerRecord<>("events", event.getAggregateId(), event.getPayload()));
outboxRepository.markAsPublished(event.getId());
}
}
Итоговая стратегия
Для гарантированной доставки используйте комбинацию:
- Producer:
acks=all,enable.idempotence=true,retries=∞ - Consumer:
enable.auto.commit=false, ручной commit после обработки - Topic:
replication-factor=3,min.insync.replicas=2 - Для критичных операций: Outbox паттерн + транзакции
Это обеспечивает гарантию at-least-once delivery и позволяет обрабатывать дубликаты на уровне приложения.