← Назад к вопросам

Как бы реализовывал Saga?

1.7 Middle🔥 192 комментариев
#Микросервисы и архитектура

Комментарии (2)

🐱
deepseek-v3.2PrepBro AI5 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Реализация Saga в Go

Saga — это паттерн управления распределёнными транзакциями, который координирует серию локальных транзакций в различных сервисах. В Go я бы реализовал его следующим образом:

Архитектурные подходы

1. Хореография (Choreography)

Каждый сервис самостоятельно публикует события и реагирует на события других сервисов.

// Пример события Saga
type OrderSagaEvent struct {
    SagaID     string    `json:"saga_id"`
    Type       string    `json:"type"` // ORDER_CREATED, PAYMENT_PROCESSED и т.д.
    Service    string    `json:"service"`
    Payload    []byte    `json:"payload"`
    Timestamp  time.Time `json:"timestamp"`
}

// Сервис обработки заказов
type OrderService struct {
    eventBus EventBus
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    // 1. Сохраняем заказ локально
    if err := s.saveOrder(ctx, order); err != nil {
        return err
    }
    
    // 2. Публикуем событие
    event := OrderSagaEvent{
        SagaID:    generateSagaID(),
        Type:      "ORDER_CREATED",
        Service:   "orders",
        Payload:   marshalOrder(order),
        Timestamp: time.Now(),
    }
    
    return s.eventBus.Publish(ctx, "saga.events", event)
}

2. Оркестрация (Orchestration)

Центральный оркестратор управляет выполнением шагов Saga.

// Оркестратор Saga
type SagaOrchestrator struct {
    steps      []SagaStep
    compensations map[string]CompensationFunc
    store      SagaStore
}

type SagaStep struct {
    Name       string
    Execute    func(context.Context, interface{}) (interface{}, error)
    Compensate func(context.Context, interface{}) error
}

func (o *SagaOrchestrator) Execute(ctx context.Context, sagaID string, data interface{}) error {
    var executedSteps []string
    
    for _, step := range o.steps {
        // Выполняем шаг
        result, err := step.Execute(ctx, data)
        if err != nil {
            // Компенсируем выполненные шаги
            return o.compensate(ctx, executedSteps, data)
        }
        
        executedSteps = append(executedSteps, step.Name)
        o.store.SaveStepResult(sagaID, step.Name, result)
    }
    
    return nil
}

func (o *SagaOrchestrator) compensate(ctx context.Context, steps []string, data interface{}) error {
    // Выполняем компенсирующие транзакции в обратном порядке
    for i := len(steps) - 1; i >= 0; i-- {
        if comp, exists := o.compensations[steps[i]]; exists {
            if err := comp(ctx, data); err != nil {
                // Логируем ошибку, но продолжаем компенсацию
                log.Printf("Compensation failed for step %s: %v", steps[i], err)
            }
        }
    }
    return errors.New("saga failed, compensation completed")
}

Ключевые компоненты реализации

Хранилище состояния Saga

type SagaStore interface {
    CreateSaga(ctx context.Context, sagaID string, initialData interface{}) error
    SaveStepResult(ctx context.Context, sagaID, stepName string, result interface{}) error
    GetSagaState(ctx context.Context, sagaID string) (*SagaState, error)
    UpdateSagaStatus(ctx context.Context, sagaID string, status SagaStatus) error
}

type InMemorySagaStore struct {
    mu    sync.RWMutex
    sagas map[string]*SagaState
}

type SagaState struct {
    ID        string
    Status    SagaStatus // PENDING, IN_PROGRESS, COMPLETED, FAILED, COMPENSATED
    StepResults map[string]interface{}
    CreatedAt time.Time
    UpdatedAt time.Time
}

Ретри и устойчивость к ошибкам

type RetryPolicy struct {
    MaxAttempts int
    Backoff     time.Duration
    MaxBackoff  time.Duration
}

func executeWithRetry(ctx context.Context, fn func() error, policy RetryPolicy) error {
    var lastErr error
    
    for attempt := 1; attempt <= policy.MaxAttempts; attempt++ {
        if err := fn(); err != nil {
            lastErr = err
            
            if attempt == policy.MaxAttempts {
                break
            }
            
            backoff := policy.Backoff * time.Duration(attempt)
            if backoff > policy.MaxBackoff {
                backoff = policy.MaxBackoff
            }
            
            select {
            case <-time.After(backoff):
                continue
            case <-ctx.Done():
                return ctx.Err()
            }
        }
        
        return nil
    }
    
    return fmt.Errorf("max retry attempts exceeded: %w", lastErr)
}

Практические рекомендации

Идемпотентность операций

  • Все шаги Saga должны быть идемпотентными
  • Использовать уникальные идентификаторы операций
  • Проверять состояние перед выполнением
func (s *PaymentService) ProcessPayment(ctx context.Context, paymentID string, amount decimal.Decimal) error {
    // Проверяем, не была ли уже обработана оплата
    if processed, err := s.isPaymentProcessed(ctx, paymentID); err != nil {
        return err
    } else if processed {
        return nil // Уже обработано, возвращаем успех
    }
    
    // Выполняем операцию
    return s.processPaymentInternal(ctx, paymentID, amount)
}

Мониторинг и observability

  • Логирование каждого шага Saga
  • Метрики успешных/неуспешных выполнений
  • Трейсинг распределённых транзакций
type ObservabilityMiddleware struct {
    metrics MetricsCollector
    tracer  Tracer
}

func (m *ObservabilityMiddleware) WrapStep(step SagaStep) SagaStep {
    return SagaStep{
        Name: step.Name,
        Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
            start := time.Now()
            
            ctx, span := m.tracer.StartSpan(ctx, "saga_step."+step.Name)
            defer span.End()
            
            result, err := step.Execute(ctx, data)
            
            // Записываем метрики
            m.metrics.RecordSagaStepDuration(step.Name, time.Since(start))
            m.metrics.IncrementSagaStepCounter(step.Name, err == nil)
            
            return result, err
        },
        Compensate: step.Compensate,
    }
}

Выбор инструментов

Для production-реализации я бы использовал:

  1. Брокер сообщений: NATS JetStream, Apache Kafka или RabbitMQ для событий
  2. Хранилище состояния: PostgreSQL с таблицей саг или Redis
  3. Оркестратор: Camunda, Temporal или самописный на основе State Machine
  4. Мониторинг: OpenTelemetry для трейсинга, Prometheus для метрик

Заключение

Реализация Saga в Go требует внимания к отказоустойчивости, идемпотентности и наблюдаемости. Оркестрируемый подход проще для сложных бизнес-процессов, в то время как хореография лучше подходит для простых сценариев. Ключевой аспект — обеспечение консистентности данных в распределённой системе через компенсирующие транзакции и корректную обработку ошибок.

Как бы реализовывал Saga? | PrepBro