← Назад к вопросам
Как реализовать распределенные транзакции используя протокол очередей?
3.0 Senior🔥 101 комментариев
#Многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Распределенные транзакции через протокол очередей (Message Queue)
Распределенные системы требуют синхронизации состояния между несколькими сервисами. Классический подход 2-Phase Commit часто приводит к deadlock'ам. Более надежное решение — асинхронные распределенные транзакции через очереди сообщений с гарантией доставки и идемпотентностью.
Проблема: Распределенные транзакции
Пример: создание заказа
User Service → Payment Service → Order Service → Notification Service
Если Payment Service упадет после списания денег,
Order Service не узнает и заказ не создастся.
Деньги потеряны!
2-Phase Commit (проблемный подход)
// Фаза 1: Prepare (блокировка ресурсов)
paymentService.prepare(amount); // блокируем деньги
orderService.prepare(orderId); // блокируем товары
// Фаза 2: Commit (фиксация)
if (paymentService.commit() && orderService.commit()) {
// успех
} else {
paymentService.rollback(); // откатить
orderService.rollback();
}
Проблемы:
- Deadlock'и
- Долгие блокировки
- Невозможно откатить, если сервис упал
- Масштабируется плохо
Решение: Event Sourcing + Message Queue
Архитектура: Saga Pattern
Order Service (создает событие)
↓
Message Queue (гарантирует доставку)
↓
Payment Service (слушает, обрабатывает)
↓
Message Queue
↓
Inventory Service
↓
Notification Service
Реализация 1: Orchestration Saga
Шаг 1: Определить события и команды
// События (Facts - то, что уже произошло)
public record OrderCreatedEvent(
String orderId,
String userId,
BigDecimal amount,
LocalDateTime timestamp) {}
public record PaymentCompletedEvent(
String orderId,
String transactionId,
BigDecimal amount,
LocalDateTime timestamp) {}
public record PaymentFailedEvent(
String orderId,
String reason,
LocalDateTime timestamp) {}
// Команды (Requests - что нужно сделать)
public record ProcessPaymentCommand(
String orderId,
String userId,
BigDecimal amount) {}
public record ReserveInventoryCommand(
String orderId,
List<OrderItem> items) {}
public record SendNotificationCommand(
String orderId,
String userId,
String message) {}
Шаг 2: Order Service (Orchestrator)
@Service
public class OrderService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private MessageQueue messageQueue; // RabbitMQ, Kafka, SQS
@Autowired
private OrderRepository orderRepository;
@Transactional
public void createOrder(CreateOrderRequest request) {
// Шаг 1: Сохранить заказ в статусе PENDING
Order order = new Order(
UUID.randomUUID().toString(),
request.getUserId(),
request.getItems(),
request.getTotalAmount()
);
order.setStatus(OrderStatus.PENDING); // еще не подтвержден
orderRepository.save(order);
// Шаг 2: Опубликовать событие
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getTotalAmount(),
LocalDateTime.now()
);
eventPublisher.publishEvent(event);
// Шаг 3: Отправить команду Payment Service
messageQueue.send("payment-commands", new ProcessPaymentCommand(
order.getId(),
order.getUserId(),
order.getTotalAmount()
));
}
// Слушатель: когда платеж завершен
@EventListener
public void onPaymentCompleted(PaymentCompletedEvent event) {
Order order = orderRepository.findById(event.orderId())
.orElseThrow();
order.setStatus(OrderStatus.PAYMENT_CONFIRMED);
order.setTransactionId(event.transactionId());
orderRepository.save(order);
// Отправить команду Inventory Service
messageQueue.send("inventory-commands", new ReserveInventoryCommand(
order.getId(),
order.getItems()
));
}
// Слушатель: платеж не прошел
@EventListener
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId())
.orElseThrow();
order.setStatus(OrderStatus.CANCELLED);
order.setCancelReason(event.reason());
orderRepository.save(order);
// Отправить уведомление пользователю
messageQueue.send("notification-commands", new SendNotificationCommand(
order.getId(),
order.getUserId(),
"Платеж не прошел: " + event.reason()
));
}
}
Шаг 3: Payment Service (обработчик команды)
@Service
public class PaymentService {
@Autowired
private MessageQueue messageQueue;
@Autowired
private PaymentGateway paymentGateway;
@Autowired
private PaymentRepository paymentRepository;
// Слушатель команды
@RabbitListener(queues = "payment-commands")
public void processPayment(ProcessPaymentCommand command) {
try {
// Шаг 1: Попытаться списать деньги
PaymentResult result = paymentGateway.charge(
command.userId(),
command.amount()
);
// Шаг 2: Сохранить результат
Payment payment = new Payment(
command.orderId(),
command.userId(),
command.amount(),
result.getTransactionId(),
PaymentStatus.COMPLETED
);
paymentRepository.save(payment);
// Шаг 3: Опубликовать событие успеха
messageQueue.send("payment-events", new PaymentCompletedEvent(
command.orderId(),
result.getTransactionId(),
command.amount(),
LocalDateTime.now()
));
} catch (PaymentException e) {
// Платеж не прошел
messageQueue.send("payment-events", new PaymentFailedEvent(
command.orderId(),
e.getMessage(),
LocalDateTime.now()
));
}
}
}
Шаг 4: Inventory Service
@Service
public class InventoryService {
@Autowired
private MessageQueue messageQueue;
@Autowired
private InventoryRepository inventoryRepository;
@RabbitListener(queues = "inventory-commands")
public void reserveInventory(ReserveInventoryCommand command) {
try {
// Зарезервировать товары (with SELECT FOR UPDATE для лока)
command.items().forEach(item -> {
Inventory inventory = inventoryRepository
.findByProductIdForUpdate(item.getProductId())
.orElseThrow(() -> new InventoryNotFoundException());
if (inventory.getQuantity() < item.getQuantity()) {
throw new InsufficientInventoryException();
}
inventory.reserve(item.getQuantity());
inventoryRepository.save(inventory);
});
// Опубликовать событие
messageQueue.send("inventory-events", new InventoryReservedEvent(
command.orderId(),
command.items(),
LocalDateTime.now()
));
} catch (Exception e) {
// Отправить команду компенсации (rollback) Payment Service
messageQueue.send("payment-compensation", new RefundPaymentCommand(
command.orderId()
));
}
}
}
Гарантии: At-Least-Once Delivery
Идемпотентность обработки
@Service
public class PaymentService {
@Autowired
private ProcessedMessageRepository processedMessages;
@RabbitListener(queues = "payment-commands")
public void processPayment(ProcessPaymentCommand command, Message message) {
String messageId = message.getMessageProperties().getHeader("X-Message-Id");
// Проверить: обработали ли мы это сообщение?
if (processedMessages.exists(messageId)) {
// Уже обработано, просто вернуться
return;
}
// Обработка
PaymentResult result = paymentGateway.charge(
command.userId(),
command.amount()
);
// Сохранить, что обработали
processedMessages.save(new ProcessedMessage(
messageId,
"payment_command",
LocalDateTime.now()
));
}
}
Использование Database для идемпотентности
@Service
public class OrderService {
@Transactional
public void handlePaymentCompleted(PaymentCompletedEvent event) {
// Используем уникальный constraint на (orderId, eventType, eventId)
ProcessedEvent processed = new ProcessedEvent(
event.orderId(),
"PAYMENT_COMPLETED",
event.transactionId()
);
try {
processedEventRepository.save(processed);
} catch (DuplicateKeyException e) {
// Уже обработано, пропустить
return;
}
// Обработка события
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.PAYMENT_CONFIRMED);
orderRepository.save(order);
}
}
Реализация 2: Choreography Saga
Вместо центрального Orchestrator, каждый сервис слушает события и издает свои.
// Order Service
@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
messageQueue.publish("order-created", event);
}
// Payment Service
@RabbitListener(queues = "order-created")
public void onOrderCreated(OrderCreatedEvent event) {
// Обработка
messageQueue.publish("payment-completed", new PaymentCompletedEvent(...));
}
// Inventory Service
@RabbitListener(queues = "payment-completed")
public void onPaymentCompleted(PaymentCompletedEvent event) {
// Обработка
messageQueue.publish("inventory-reserved", new InventoryReservedEvent(...));
}
Dead Letter Queue для ошибок
@Configuration
public class RabbitConfig {
@Bean
public Queue paymentQueue() {
return QueueBuilder.durable("payment-commands")
.deadLetterExchange("dlx")
.deadLetterRoutingKey("payment-commands.dlq")
.build();
}
@Bean
public Queue dlq() {
return QueueBuilder.durable("payment-commands.dlq").build();
}
}
// Обработчик DLQ
@RabbitListener(queues = "payment-commands.dlq")
public void handleFailedPayment(ProcessPaymentCommand command) {
logger.error("Payment failed after retries: {}", command.orderId());
// Отправить алерт администратору
alertService.sendAlert("Payment processing failed");
}
Мониторинг распределенных транзакций
@Service
public class TransactionMonitor {
@Autowired
private MeterRegistry meterRegistry;
public void trackSagaCompletion(String orderId, long durationMs) {
Timer timer = Timer.builder("saga.completion.time")
.publishPercentiles(0.5, 0.99)
.register(meterRegistry);
timer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void trackSagaFailure(String orderId, String reason) {
Counter counter = Counter.builder("saga.failures")
.tag("reason", reason)
.register(meterRegistry);
counter.increment();
}
}
Таблица сравнения подходов
| Подход | Преимущества | Недостатки | Когда использовать |
|---|---|---|---|
| 2PC | Транзакционная консистентность | Deadlock'и, медленно | Простые системы |
| Orchestration | Контролируемый flow | Центральная точка отказа | Сложные саги |
| Choreography | Масштабируемость | Сложная отладка | Много микросервисов |
| Event Sourcing | Полная история | Сложнее понять | Critical systems |
Вывод
Для распределенных транзакций используйте:
- Message Queue для гарантии доставки
- Saga Pattern (Orchestration или Choreography)
- Идемпотентные обработчики через unique constraints
- Dead Letter Queue для ошибок
- Event Sourcing для аудита всех действий
Это обеспечивает eventual consistency — конечную консистентность, которая лучше работает в распределенных системах.