← Назад к вопросам

Как реализовать распределенные транзакции используя протокол очередей?

3.0 Senior🔥 101 комментариев
#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Распределенные транзакции через протокол очередей (Message Queue)

Распределенные системы требуют синхронизации состояния между несколькими сервисами. Классический подход 2-Phase Commit часто приводит к deadlock'ам. Более надежное решение — асинхронные распределенные транзакции через очереди сообщений с гарантией доставки и идемпотентностью.

Проблема: Распределенные транзакции

Пример: создание заказа

User Service → Payment Service → Order Service → Notification Service

Если Payment Service упадет после списания денег,
Order Service не узнает и заказ не создастся.
Деньги потеряны!

2-Phase Commit (проблемный подход)

// Фаза 1: Prepare (блокировка ресурсов)
paymentService.prepare(amount); // блокируем деньги
orderService.prepare(orderId);  // блокируем товары

// Фаза 2: Commit (фиксация)
if (paymentService.commit() && orderService.commit()) {
    // успех
} else {
    paymentService.rollback(); // откатить
    orderService.rollback();
}

Проблемы:

  • Deadlock'и
  • Долгие блокировки
  • Невозможно откатить, если сервис упал
  • Масштабируется плохо

Решение: Event Sourcing + Message Queue

Архитектура: Saga Pattern

Order Service (создает событие)
    ↓
Message Queue (гарантирует доставку)
    ↓
Payment Service (слушает, обрабатывает)
    ↓
Message Queue
    ↓
Inventory Service
    ↓
Notification Service

Реализация 1: Orchestration Saga

Шаг 1: Определить события и команды

// События (Facts - то, что уже произошло)
public record OrderCreatedEvent(
    String orderId,
    String userId,
    BigDecimal amount,
    LocalDateTime timestamp) {}

public record PaymentCompletedEvent(
    String orderId,
    String transactionId,
    BigDecimal amount,
    LocalDateTime timestamp) {}

public record PaymentFailedEvent(
    String orderId,
    String reason,
    LocalDateTime timestamp) {}

// Команды (Requests - что нужно сделать)
public record ProcessPaymentCommand(
    String orderId,
    String userId,
    BigDecimal amount) {}

public record ReserveInventoryCommand(
    String orderId,
    List<OrderItem> items) {}

public record SendNotificationCommand(
    String orderId,
    String userId,
    String message) {}

Шаг 2: Order Service (Orchestrator)

@Service
public class OrderService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    @Autowired
    private MessageQueue messageQueue; // RabbitMQ, Kafka, SQS
    
    @Autowired
    private OrderRepository orderRepository;

    @Transactional
    public void createOrder(CreateOrderRequest request) {
        // Шаг 1: Сохранить заказ в статусе PENDING
        Order order = new Order(
            UUID.randomUUID().toString(),
            request.getUserId(),
            request.getItems(),
            request.getTotalAmount()
        );
        order.setStatus(OrderStatus.PENDING); // еще не подтвержден
        orderRepository.save(order);
        
        // Шаг 2: Опубликовать событие
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getUserId(),
            order.getTotalAmount(),
            LocalDateTime.now()
        );
        eventPublisher.publishEvent(event);
        
        // Шаг 3: Отправить команду Payment Service
        messageQueue.send("payment-commands", new ProcessPaymentCommand(
            order.getId(),
            order.getUserId(),
            order.getTotalAmount()
        ));
    }

    // Слушатель: когда платеж завершен
    @EventListener
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        Order order = orderRepository.findById(event.orderId())
            .orElseThrow();
        
        order.setStatus(OrderStatus.PAYMENT_CONFIRMED);
        order.setTransactionId(event.transactionId());
        orderRepository.save(order);
        
        // Отправить команду Inventory Service
        messageQueue.send("inventory-commands", new ReserveInventoryCommand(
            order.getId(),
            order.getItems()
        ));
    }

    // Слушатель: платеж не прошел
    @EventListener
    public void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.orderId())
            .orElseThrow();
        
        order.setStatus(OrderStatus.CANCELLED);
        order.setCancelReason(event.reason());
        orderRepository.save(order);
        
        // Отправить уведомление пользователю
        messageQueue.send("notification-commands", new SendNotificationCommand(
            order.getId(),
            order.getUserId(),
            "Платеж не прошел: " + event.reason()
        ));
    }
}

Шаг 3: Payment Service (обработчик команды)

@Service
public class PaymentService {
    @Autowired
    private MessageQueue messageQueue;
    
    @Autowired
    private PaymentGateway paymentGateway;
    
    @Autowired
    private PaymentRepository paymentRepository;

    // Слушатель команды
    @RabbitListener(queues = "payment-commands")
    public void processPayment(ProcessPaymentCommand command) {
        try {
            // Шаг 1: Попытаться списать деньги
            PaymentResult result = paymentGateway.charge(
                command.userId(),
                command.amount()
            );
            
            // Шаг 2: Сохранить результат
            Payment payment = new Payment(
                command.orderId(),
                command.userId(),
                command.amount(),
                result.getTransactionId(),
                PaymentStatus.COMPLETED
            );
            paymentRepository.save(payment);
            
            // Шаг 3: Опубликовать событие успеха
            messageQueue.send("payment-events", new PaymentCompletedEvent(
                command.orderId(),
                result.getTransactionId(),
                command.amount(),
                LocalDateTime.now()
            ));
        } catch (PaymentException e) {
            // Платеж не прошел
            messageQueue.send("payment-events", new PaymentFailedEvent(
                command.orderId(),
                e.getMessage(),
                LocalDateTime.now()
            ));
        }
    }
}

Шаг 4: Inventory Service

@Service
public class InventoryService {
    @Autowired
    private MessageQueue messageQueue;
    
    @Autowired
    private InventoryRepository inventoryRepository;

    @RabbitListener(queues = "inventory-commands")
    public void reserveInventory(ReserveInventoryCommand command) {
        try {
            // Зарезервировать товары (with SELECT FOR UPDATE для лока)
            command.items().forEach(item -> {
                Inventory inventory = inventoryRepository
                    .findByProductIdForUpdate(item.getProductId())
                    .orElseThrow(() -> new InventoryNotFoundException());
                
                if (inventory.getQuantity() < item.getQuantity()) {
                    throw new InsufficientInventoryException();
                }
                
                inventory.reserve(item.getQuantity());
                inventoryRepository.save(inventory);
            });
            
            // Опубликовать событие
            messageQueue.send("inventory-events", new InventoryReservedEvent(
                command.orderId(),
                command.items(),
                LocalDateTime.now()
            ));
        } catch (Exception e) {
            // Отправить команду компенсации (rollback) Payment Service
            messageQueue.send("payment-compensation", new RefundPaymentCommand(
                command.orderId()
            ));
        }
    }
}

Гарантии: At-Least-Once Delivery

Идемпотентность обработки

@Service
public class PaymentService {
    @Autowired
    private ProcessedMessageRepository processedMessages;

    @RabbitListener(queues = "payment-commands")
    public void processPayment(ProcessPaymentCommand command, Message message) {
        String messageId = message.getMessageProperties().getHeader("X-Message-Id");
        
        // Проверить: обработали ли мы это сообщение?
        if (processedMessages.exists(messageId)) {
            // Уже обработано, просто вернуться
            return;
        }
        
        // Обработка
        PaymentResult result = paymentGateway.charge(
            command.userId(),
            command.amount()
        );
        
        // Сохранить, что обработали
        processedMessages.save(new ProcessedMessage(
            messageId,
            "payment_command",
            LocalDateTime.now()
        ));
    }
}

Использование Database для идемпотентности

@Service
public class OrderService {
    @Transactional
    public void handlePaymentCompleted(PaymentCompletedEvent event) {
        // Используем уникальный constraint на (orderId, eventType, eventId)
        ProcessedEvent processed = new ProcessedEvent(
            event.orderId(),
            "PAYMENT_COMPLETED",
            event.transactionId()
        );
        
        try {
            processedEventRepository.save(processed);
        } catch (DuplicateKeyException e) {
            // Уже обработано, пропустить
            return;
        }
        
        // Обработка события
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.PAYMENT_CONFIRMED);
        orderRepository.save(order);
    }
}

Реализация 2: Choreography Saga

Вместо центрального Orchestrator, каждый сервис слушает события и издает свои.

// Order Service
@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
    messageQueue.publish("order-created", event);
}

// Payment Service
@RabbitListener(queues = "order-created")
public void onOrderCreated(OrderCreatedEvent event) {
    // Обработка
    messageQueue.publish("payment-completed", new PaymentCompletedEvent(...));
}

// Inventory Service
@RabbitListener(queues = "payment-completed")
public void onPaymentCompleted(PaymentCompletedEvent event) {
    // Обработка
    messageQueue.publish("inventory-reserved", new InventoryReservedEvent(...));
}

Dead Letter Queue для ошибок

@Configuration
public class RabbitConfig {
    @Bean
    public Queue paymentQueue() {
        return QueueBuilder.durable("payment-commands")
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("payment-commands.dlq")
            .build();
    }
    
    @Bean
    public Queue dlq() {
        return QueueBuilder.durable("payment-commands.dlq").build();
    }
}

// Обработчик DLQ
@RabbitListener(queues = "payment-commands.dlq")
public void handleFailedPayment(ProcessPaymentCommand command) {
    logger.error("Payment failed after retries: {}", command.orderId());
    // Отправить алерт администратору
    alertService.sendAlert("Payment processing failed");
}

Мониторинг распределенных транзакций

@Service
public class TransactionMonitor {
    @Autowired
    private MeterRegistry meterRegistry;
    
    public void trackSagaCompletion(String orderId, long durationMs) {
        Timer timer = Timer.builder("saga.completion.time")
            .publishPercentiles(0.5, 0.99)
            .register(meterRegistry);
        timer.record(durationMs, TimeUnit.MILLISECONDS);
    }
    
    public void trackSagaFailure(String orderId, String reason) {
        Counter counter = Counter.builder("saga.failures")
            .tag("reason", reason)
            .register(meterRegistry);
        counter.increment();
    }
}

Таблица сравнения подходов

ПодходПреимуществаНедостаткиКогда использовать
2PCТранзакционная консистентностьDeadlock'и, медленноПростые системы
OrchestrationКонтролируемый flowЦентральная точка отказаСложные саги
ChoreographyМасштабируемостьСложная отладкаМного микросервисов
Event SourcingПолная историяСложнее понятьCritical systems

Вывод

Для распределенных транзакций используйте:

  1. Message Queue для гарантии доставки
  2. Saga Pattern (Orchestration или Choreography)
  3. Идемпотентные обработчики через unique constraints
  4. Dead Letter Queue для ошибок
  5. Event Sourcing для аудита всех действий

Это обеспечивает eventual consistency — конечную консистентность, которая лучше работает в распределенных системах.