Что такое Saga в построении микросервисов?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Saga в микросервисной архитектуре
Что это такое?
Saga — это паттерн, используемый в микросервисной архитектуре для управления распределёнными транзакциями, которые охватывают несколько микросервисов. Поскольку классические ACID-транзакции невозможны в распределённой среде, Saga обеспечивает логическую целостность данных через последовательность локальных транзакций.
Вместо одной глобальной транзакции, Saga разбивает работу на несколько шагов, каждый из которых выполняется отдельным сервисом. Если один шаг не удаётся, срабатывает механизм компенсации (откат) для восстановления консистентности.
Два подхода реализации
Хореография (Choreography) Каждый микросервис знает о следующем в последовательности. События передаются через message broker.
Оркестрация (Orchestration) Центральный сервис (orchestrator) управляет всеми шагами и координирует работу микросервисов.
Пример: Заказ в интернет-магазине
Сценарий: Пользователь создаёт заказ → проверка запасов → обработка платежа → отправка товара.
Реализация с оркестрацией:
from dataclasses import dataclass
from enum import Enum
from typing import List
class SagaStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
@dataclass
class Order:
order_id: str
user_id: str
items: List[dict]
status: SagaStatus = SagaStatus.PENDING
class OrderSaga:
def __init__(self, inventory_service, payment_service, shipping_service):
self.inventory = inventory_service
self.payment = payment_service
self.shipping = shipping_service
self.completed_steps = []
async def execute(self, order: Order) -> bool:
try:
# Шаг 1: Проверка товара
await self.inventory.reserve_items(order.items)
self.completed_steps.append("inventory")
print(f"✓ Товар зарезервирован для заказа {order.order_id}")
# Шаг 2: Обработка платежа
payment_result = await self.payment.charge(order.user_id, order.total_price)
self.completed_steps.append("payment")
print(f"✓ Платёж обработан для заказа {order.order_id}")
# Шаг 3: Отправка товара
await self.shipping.send_order(order.order_id)
self.completed_steps.append("shipping")
print(f"✓ Заказ отправлен: {order.order_id}")
order.status = SagaStatus.COMPLETED
return True
except Exception as e:
print(f"✗ Ошибка на этапе: {e}")
await self.compensate(order)
order.status = SagaStatus.FAILED
return False
async def compensate(self, order: Order):
"""Откат изменений в обратном порядке"""
order.status = SagaStatus.COMPENSATING
# Откатываем в обратном порядке
if "shipping" in self.completed_steps:
await self.shipping.cancel_order(order.order_id)
print(f"↶ Отправка отменена для {order.order_id}")
if "payment" in self.completed_steps:
await self.payment.refund(order.user_id)
print(f"↶ Возврат платежа для {order.user_id}")
if "inventory" in self.completed_steps:
await self.inventory.release_items(order.items)
print(f"↶ Товар освобожден из резерва")
Реализация с хореографией (через события):
from abc import ABC, abstractmethod
from typing import Callable, List
import asyncio
class Event:
def __init__(self, event_type: str, data: dict):
self.event_type = event_type
self.data = data
class EventBus:
def __init__(self):
self.subscribers: dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event: Event):
if event.event_type in self.subscribers:
for handler in self.subscribers[event.event_type]:
await handler(event)
class InventoryService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.event_bus.subscribe("OrderCreated", self.handle_order_created)
async def handle_order_created(self, event: Event):
print(f"Резервируем товар: {event.data}")
# Резервируем товар
await self.event_bus.publish(Event("ItemsReserved", event.data))
class PaymentService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.event_bus.subscribe("ItemsReserved", self.handle_items_reserved)
async def handle_items_reserved(self, event: Event):
print(f"Обрабатываем платёж: {event.data}")
# Обрабатываем платёж
await self.event_bus.publish(Event("PaymentProcessed", event.data))
class ShippingService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.event_bus.subscribe("PaymentProcessed", self.handle_payment_processed)
async def handle_payment_processed(self, event: Event):
print(f"Отправляем заказ: {event.data}")
# Отправляем заказ
await self.event_bus.publish(Event("OrderShipped", event.data))
Сравнение двух подходов
| Критерий | Хореография | Оркестрация |
|---|---|---|
| Сложность | Низкая | Высокая |
| Масштабируемость | Хорошая | Может быть узким местом |
| Отладка | Сложнее (распределённая логика) | Легче (централизованная) |
| Тестирование | Сложнее | Проще |
| Зависимости | Слабые связи | Единая точка отказа |
Важные аспекты Saga
1. Идемпотентность Каждая операция должна быть идемпотентной — повторное выполнение не должно вызвать проблемы:
def process_payment(payment_id: str, amount: float):
# Проверяем, не обработан ли уже платёж
existing_payment = db.query(Payment).filter_by(id=payment_id).first()
if existing_payment:
return existing_payment
# Новый платёж
payment = Payment(id=payment_id, amount=amount)
db.add(payment)
db.commit()
return payment
2. Обработка ошибок Нужно предусмотреть retry-логику и timeout механизмы.
3. Мониторинг Процесс Saga должен быть видим для отладки и мониторинга.
Когда использовать Saga
✓ Распределённые системы с несколькими микросервисами ✓ Сложные бизнес-процессы, требующие откатов ✓ Когда нужна высокая доступность
✗ Простые синхронные операции ✗ Когда требуется строгая консистентность в реальном времени
Saga — это мощный паттерн для управления сложными процессами в микросервисной архитектуре, обеспечивающий надёжность при отсутствии глобальных ACID-транзакций.