← Назад к вопросам

Какие знаешь подходы проектирования системы, которая гарантирует отправку сообщений в Kafka?

3.0 Senior🔥 121 комментариев
#SOLID и паттерны проектирования#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Подходы проектирования системы для гарантии отправки сообщений в Kafka

Гарантирование доставки сообщений в Kafka — это критическая проблема, особенно в финансовых системах и системах обработки платежей. Kafka предлагает несколько механизмов, но каждый имеет свои компромиссы.

Уровни гарантий в Kafka

1. At-most-once (максимум один раз): Сообщение может быть потеряно 2. At-least-once (минимум один раз): Сообщение может быть продублировано 3. Exactly-once (ровно один раз): Сообщение доставлено ровно один раз

Подход 1: At-least-once с идемпотентностью

Стратегия: Гарантируем, что сообщение отправлено, и обрабатываем дубликаты на consumer стороне.

@Component
public class OrderProducer {
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    public void publishOrderEvent(OrderEvent event) {
        // 1. Сохраняем событие в БД в статусе PENDING
        OrderEventLog log = new OrderEventLog();
        log.setOrderId(event.getOrderId());
        log.setStatus("PENDING");
        log.setPayload(serializeEvent(event));
        eventLogRepository.save(log);
        
        // 2. Отправляем в Kafka
        ListenableFuture<SendResult<String, OrderEvent>> future = 
            kafkaTemplate.send("orders-topic", event.getOrderId(), event);
        
        // 3. Callback для успеха
        future.addCallback(
            result -> {
                // Обновляем статус на SENT
                OrderEventLog sent = eventLogRepository.findById(log.getId()).get();
                sent.setStatus("SENT");
                sent.setKafkaOffset(result.getRecordMetadata().offset());
                eventLogRepository.save(sent);
            },
            ex -> {
                // Ошибка! Оставляем PENDING, будем retry
                log.setErrorMessage(ex.getMessage());
                eventLogRepository.save(log);
            }
        );
    }
}

// Retry механизм для незаотправленных событий
@Component
public class EventRetryService {
    @Scheduled(fixedDelay = 5000)
    public void retryFailedEvents() {
        List<OrderEventLog> pendingEvents = 
            eventLogRepository.findByStatus("PENDING");
        
        for (OrderEventLog log : pendingEvents) {
            try {
                OrderEvent event = deserializeEvent(log.getPayload());
                kafkaTemplate.send("orders-topic", event.getOrderId(), event);
                log.setStatus("SENT");
                eventLogRepository.save(log);
            } catch (Exception e) {
                log.setRetryCount(log.getRetryCount() + 1);
                if (log.getRetryCount() > 5) {
                    log.setStatus("FAILED");
                }
                eventLogRepository.save(log);
            }
        }
    }
}

Конфигурация Kafka для at-least-once:

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, OrderEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // Гарантия at-least-once
        configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // all replicas must acknowledge
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

// Consumer с идемпотентностью
@Component
public class OrderEventConsumer {
    @Autowired
    private OrderService orderService;
    @Autowired
    private ProcessedEventRepository processedEventRepository;
    
    @KafkaListener(topics = "orders-topic", groupId = "order-service")
    public void consume(OrderEvent event, 
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.OFFSET) long offset) {
        
        String eventId = event.getOrderId() + "-" + partition + "-" + offset;
        
        // 1. Проверяем, не обработали ли уже это событие
        if (processedEventRepository.existsById(eventId)) {
            return; // Дубликат, игнорируем
        }
        
        try {
            // 2. Обрабатываем событие
            orderService.processOrder(event);
            
            // 3. Записываем, что обработали
            ProcessedEvent processed = new ProcessedEvent();
            processed.setId(eventId);
            processed.setProcessedAt(Instant.now());
            processedEventRepository.save(processed);
        } catch (Exception e) {
            // Ошибка обработки
            throw new RuntimeException("Failed to process event", e);
        }
    }
}

Подход 2: Transactional outbox pattern

Стратегия: Отправляем сообщение и сохраняем данные в одной транзакции.

@Component
public class OrderService {
    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private OutboxRepository outboxRepository;
    
    @Transactional
    public void createOrder(CreateOrderRequest request) {
        // 1. Создаём заказ
        Order order = new Order();
        order.setCustomerId(request.getCustomerId());
        order.setTotal(request.getTotal());
        orderRepository.save(order);
        
        // 2. В ОДНОЙ транзакции пишем в outbox
        OutboxEvent event = new OutboxEvent();
        event.setAggregateId(order.getId());
        event.setEventType("OrderCreated");
        event.setPayload(serializeOrder(order));
        event.setProcessed(false); // Ещё не отправлено
        outboxRepository.save(event);
        
        // Обе операции успешны или обе откатятся
    }
}

// Отдельный процесс читает outbox и отправляет в Kafka
@Component
public class OutboxPoller {
    @Autowired
    private OutboxRepository outboxRepository;
    @Autowired
    private KafkaTemplate<String, OutboxEvent> kafkaTemplate;
    
    @Scheduled(fixedDelay = 1000)
    public void pollAndPublish() {
        List<OutboxEvent> unprocessedEvents = 
            outboxRepository.findByProcessedFalse();
        
        for (OutboxEvent event : unprocessedEvents) {
            try {
                kafkaTemplate.send("orders-topic", event.getAggregateId().toString(), event);
                event.setProcessed(true);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);
            } catch (Exception e) {
                // Retry на следующей итерации
                log.error("Failed to publish event", e);
            }
        }
    }
}

// Сущность для outbox
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private Long aggregateId;
    private String eventType;
    private String payload;
    private boolean processed;
    private Instant publishedAt;
    private Instant createdAt = Instant.now();
}

Преимущества Transactional Outbox:

  • Гарантирует, что событие либо сохранено в БД, либо нет
  • Нет race condition между БД и Kafka
  • Если Kafka down, процесс перезапустится и отправит

Подход 3: Saga pattern с компенсирующими транзакциями

Для систем с несколькими Kafka topics:

@Component
public class OrderCreationSaga {
    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private KafkaTemplate<String, Event> kafkaTemplate;
    
    @Transactional
    public void executeOrderCreation(CreateOrderRequest request) {
        try {
            // Шаг 1: создаём заказ
            Order order = createOrder(request);
            publishEvent("order-created", new OrderCreatedEvent(order.getId()));
            
            // Шаг 2: резервируем товар
            publishEvent("inventory", new ReserveItemEvent(order.getId()));
            
            // Шаг 3: инициируем платёж
            publishEvent("payment", new ProcessPaymentEvent(order.getId()));
            
        } catch (Exception e) {
            // Компенсирующая транзакция
            publishEvent("order-compensation", new CancelOrderEvent(order.getId()));
            throw e;
        }
    }
    
    private void publishEvent(String topic, Event event) {
        try {
            kafkaTemplate.send(topic, event.getId().toString(), event);
        } catch (Exception e) {
            throw new EventPublishingException("Failed to publish event to " + topic, e);
        }
    }
}

Подход 4: Exactly-once с транзакциями Kafka (Kafka Transactions)

Для критичных систем, где дубликаты недопустимы:

@Configuration
public class KafkaTransactionConfig {
    @Bean
    public ProducerFactory<String, OrderEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

@Component
public class TransactionalOrderProducer {
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    @Transactional
    public void publishOrderWithTransaction(OrderEvent event) {
        // Вся операция выполняется в Kafka транзакции
        kafkaTemplate.send("orders-topic", event.getOrderId(), event);
        // Если произойдёт исключение, транзакция откатится
    }
}

// Consumer с exactly-once
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // Читаем только committed
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
}

Конфигурация для exactly-once:

  • Producer: ACKS=ALL, IDEMPOTENCE=true, TRANSACTIONAL_ID
  • Consumer: ISOLATION_LEVEL=read_committed
  • Overhead: значительный (медленнее)

Сравнение подходов

ПодходГарантияСложностьPerformanceЛучше для
At-least-once + IdempotentAt-least-onceСредняяВысокаяБольшинство случаев
Outbox PatternAt-least-onceСредняяВысокаяEvent sourcing
Saga PatternAt-least-onceВысокаяСредняяРаспределённые транзакции
Kafka TransactionsExactly-onceВысокаяНизкаяКритичные системы

Best Practices

// 1. Всегда используй ACKS=all
configProps.put(ProducerConfig.ACKS_CONFIG, "all");

// 2. Используй ENABLE_IDEMPOTENCE для дебага дубликатов
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 3. Обработай ошибки отправки
future.addCallback(
    success -> log.info("Sent: " + success.getRecordMetadata().offset()),
    failure -> log.error("Failed to send", failure)
);

// 4. Имплементируй Idempotent Consumer
// Используй combination of (topic, partition, offset) как ключ
String idempotentKey = topic + "-" + partition + "-" + offset;

// 5. Логируй все события в БД
EventLog log = new EventLog();
log.setTopicName(topic);
log.setOffset(offset);
log.setProcessedAt(Instant.now());
eventLogRepository.save(log);

Рекомендуемая архитектура для production

Используй Transactional Outbox Pattern:

1. Сохраняй бизнес-данные и событие в одной транзакции (в outbox таблице)
2. Отдельный процесс (poll) читает outbox и отправляет в Kafka
3. Отметь событие как отправленное только после успешной отправки
4. Retry механизм для неудачных отправок
5. Dead letter queue для невосстановимых ошибок

Гарантирует:

  • At-least-once доставку (не потеряет сообщение)
  • Простоту отладки (всё в одной БД)
  • Хорошую производительность
  • Масштабируемость

Заключение

Для гарантирования отправки сообщений в Kafka:

  1. At-least-once + идемпотентность — стандартный подход
  2. Transactional Outbox Pattern — лучше для consistency
  3. Saga Pattern — для распределённых транзакций
  4. Kafka Transactions — только если нужна exactly-once гарантия

Выбор зависит от требований к консистентности и производительности вашей системы.

Какие знаешь подходы проектирования системы, которая гарантирует отправку сообщений в Kafka? | PrepBro