← Назад к вопросам
Как спроектировать приложение Kafka для обработки сообщения строго один раз
2.4 Senior🔥 81 комментариев
#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Exactly-Once Semantics (EOS) в Kafka: проектирование надёжного приложения
Обработка сообщений строго один раз (exactly-once) — одна из самых сложных задач в распределённых системах. Kafka предоставляет встроенные механизмы, но требует правильной архитектуры приложения.
Уровни гарантирования доставки
┌─────────────────────────────────────────────────────┐
│ At-Most-Once (Максимум раз) │
│ - Сообщение может быть потеряно │
│ - Быстро, но ненадёжно │
│ - Используется: аналитика, логирование │
├─────────────────────────────────────────────────────┤
│ At-Least-Once (Минимум раз) │
│ - Сообщение может быть обработано несколько раз │
│ - Требует идемпотентности │
│ - Используется: операции с БД │
├─────────────────────────────────────────────────────┤
│ Exactly-Once (Строго один раз) ✓ │
│ - Сообщение обработано ровно один раз │
│ - Самое надёжное и сложное │
│ - Требует специальной архитектуры │
└─────────────────────────────────────────────────────┘
Архитектурный подход
Для достижения exactly-once нужны три компонента:
- Идемпотентный Producer — отправитель
- Трансакционная обработка — потребитель
- Дедупликация — удаление дубликатов
Решение 1: Producer-side Exactly-Once
Идемпотентный Producer
public class IdempotentProducer {
private final KafkaTemplate<String, Order> kafkaTemplate;
public void sendOrderIdempotent(Order order) {
// Генерируем уникальный ID для сообщения
String messageId = UUID.randomUUID().toString();
ProducerRecord<String, Order> record = new ProducerRecord<>(
"orders",
order.getId().toString(), // Key (партиционирование)
order
);
// Добавляем ID в headers для дедупликации
record.headers().add("message-id", messageId.getBytes());
record.headers().add("timestamp", System.currentTimeMillis() + "".getBytes());
kafkaTemplate.send(record);
}
}
@Configuration
public class IdempotentProducerConfig {
@Bean
public ProducerFactory<String, Order> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// КРИТИЧНЫЕ ПАРАМЕТРЫ ДЛЯ ИДЕМПОТЕНТНОСТИ:
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
return new DefaultProducerFactory<>(configProps);
}
}
Решение 2: Consumer-side Exactly-Once с Transactions
Это основной механизм. Используем транзакции Kafka и БД вместе:
@Service
public class TransactionalOrderConsumer {
@Autowired
private OrderRepository orderRepository;
@Autowired
private ConsumerOffsetRepository consumerOffsetRepository;
@KafkaListener(
topics = "orders",
groupId = "exactly-once-group",
properties = {
"isolation.level=read_committed",
"enable.auto.commit=false"
}
)
public void consumeWithTransactions(
ConsumerRecord<String, Order> record,
Acknowledgment acknowledgment) {
// Используем @Transactional для синхронизации с БД
processOrderInTransaction(record, acknowledgment);
}
@Transactional
private void processOrderInTransaction(
ConsumerRecord<String, Order> record,
Acknowledgment acknowledgment) {
Order order = record.value();
String messageId = extractMessageId(record);
long offset = record.offset();
int partition = record.partition();
try {
// 1. Проверяем, не обработали ли мы это сообщение
if (orderRepository.existsByMessageId(messageId)) {
// Сообщение уже обработано - пропускаем
acknowledgeAndCommit(record, acknowledgment, offset, partition);
return;
}
// 2. Обрабатываем заказ
Order processedOrder = processOrder(order);
// 3. Сохраняем в БД с сохранением messageId
processedOrder.setMessageId(messageId);
orderRepository.save(processedOrder);
// 4. Сохраняем offset в ОДНОЙ транзакции с данными
consumerOffsetRepository.save(new ConsumerOffset(
"orders",
partition,
offset,
System.currentTimeMillis()
));
// 5. Коммитим offset (синхронизировано с БД)
acknowledgment.acknowledge();
} catch (Exception e) {
// Транзакция откатится автоматически
log.error("Ошибка обработки заказа: " + messageId, e);
// Не коммитим - сообщение будет переобработано
}
}
private void acknowledgeAndCommit(
ConsumerRecord<String, Order> record,
Acknowledgment acknowledgment,
long offset,
int partition) {
acknowledgment.acknowledge();
// В контексте транзакции это безопасно
}
}
Решение 3: Kafka Streams для Exactly-Once
Кафка Streams имеет встроенную поддержку exactly-once:
@Configuration
public class ExactlyOnceStreamsConfig {
@Bean
public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// КЛЮЧЕВОЙ ПАРАМЕТР ДЛЯ EXACTLY-ONCE:
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 86400000);
StreamsBuilderFactoryBean factoryBean = new StreamsBuilderFactoryBean();
factoryBean.setStreamsConfiguration(config);
return factoryBean;
}
@Bean
public KStream<String, Order> ordersStream(StreamsBuilder builder) {
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(
Serdes.String(),
new JsonSerde<>(Order.class)
)
);
orders
.peek((key, value) -> log.info("Processing: {} -> {}", key, value.getId()))
.mapValues(this::validateAndProcessOrder)
.to("processed-orders",
Produced.with(
Serdes.String(),
new JsonSerde<>(ProcessedOrder.class)
)
);
return orders;
}
}
Решение 4: Дедупликация на основе БД
Хранём обработанные сообщения для защиты от дубликатов:
@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
@Id
private String messageId;
private String topicName;
private int partitionNumber;
private long offset;
private LocalDateTime processedAt;
@Index(unique = true)
@Column(nullable = false)
private String idempotencyKey; // UUID + timestamp
}
@Repository
public interface ProcessedMessageRepository
extends JpaRepository<ProcessedMessage, String> {
Optional<ProcessedMessage> findByIdempotencyKey(String key);
}
@Service
public class DeduplicationService {
@Autowired
private ProcessedMessageRepository processedMessageRepository;
@Transactional
public <T> T processWithDeduplication(
String idempotencyKey,
Function<String, T> processor) {
// Проверяем, не обработано ли
Optional<ProcessedMessage> existing =
processedMessageRepository.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent()) {
log.warn("Сообщение {} уже обработано в {}",
idempotencyKey, existing.get().getProcessedAt());
return null; // или вернуть закэшированный результат
}
// Обрабатываем
T result = processor.apply(idempotencyKey);
// Записываем как обработанное
ProcessedMessage message = new ProcessedMessage();
message.setIdempotencyKey(idempotencyKey);
message.setProcessedAt(LocalDateTime.now());
processedMessageRepository.save(message);
return result;
}
}
Решение 5: Полная архитектура с трансакциями
@Configuration
public class FullyTransactionalKafkaConfig {
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "exactly-once-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Читаем только committed сообщения
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Ручной offset management в транзакции
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultConsumerFactory<>(props);
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(
ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
}
@Service
public class ExactlyOnceOrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
@Transactional // Синхронизирует Kafka и БД транзакции
public void processOrderExactlyOnce(ConsumerRecord<String, Order> record) {
Order order = record.value();
String messageId = extractMessageId(record);
try {
// Если уже есть - не обработаем
if (orderRepository.findByMessageId(messageId).isPresent()) {
return;
}
// Обрабатываем
Order processed = new Order();
processed.setId(order.getId());
processed.setMessageId(messageId);
processed.setStatus("PROCESSED");
processed.setProcessedAt(LocalDateTime.now());
orderRepository.save(processed);
// Отправляем в следующий топик ВО ВРЕМЯ ТРАНЗАКЦИИ
kafkaTemplate.send("processed-orders", processed.getId(), processed);
// Транзакция покроет обе операции
} catch (Exception e) {
// Откат обеих операций
throw new RuntimeException("Ошибка обработки", e);
}
}
}
Чеклист для Exactly-Once
✅ Producer:
- Enable idempotence
- Acks = "all"
- Добавлять message-id в headers
- Retries > 0
✅ Consumer:
- Enable auto commit = false
- Isolation level = read_committed
- Обрабатывать в транзакции
- Проверять дубликаты перед обработкой
- Коммитить offset после успеха
✅ База данных:
- Unique constraint на messageId
- Одна транзакция для данных + offset
- Retry логика при deadlock
✅ Мониторинг:
- Логировать все обработки
- Alerting на durable сообщения
- Метрики обработки и переобработки
Этот подход гарантирует, что каждое сообщение будет обработано ровно один раз, что критично для финансовых операций и заказов!