← Назад к вопросам

Что такое Transaction Outbox?

2.0 Middle🔥 191 комментариев
#Микросервисы и архитектура#Базы данных

Комментарии (1)

🐱
deepseek-v3.2PrepBro AI5 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое Transaction Outbox?

Transaction Outbox — это архитектурный паттерн, используемый для обеспечения надежной отправки сообщений или событий в распределенных системах, особенно в контексте микросервисов. Он решает ключевую проблему: как гарантировать атомарность обновления базы данных и отправки сообщения (например, в брокер сообщений, как Kafka или RabbitMQ) без использования распределенных транзакций (2PC), которые считаются ненадежными и сложными в поддержке.

Основная идея заключается в том, чтобы откладывать реальную отправку сообщения, сначала сохраняя его в специальную таблицу-«исходящий почтовый ящик» (Outbox) в рамках той же транзакции базы данных, что и основное бизнес-обновление. Таким образом, обновление состояния и запись сообщения становятся атомарными. Затем отдельный процесс (Publisher или Relay) асинхронно извлекает новые записи из Outbox и доставляет их во внешнюю систему.

Проблема, которую решает паттерн

Рассмотрим классический пример на Go: сервис обрабатывает заказ и должен обновить его статус в БД и отправить событие OrderCreated в брокер.

// НАИВНАЯ И НЕБЕЗОПАСНАЯ РЕАЛИЗАЦИЯ (ПРОБЛЕМА)
func processOrder(orderID string, tx *sql.Tx) error {
    // 1. Обновляем статус заказа в БД
    _, err := tx.Exec("UPDATE orders SET status = 'processed' WHERE id = $1", orderID)
    if err != nil {
        tx.Rollback()
        return err
    }

    // 2. Отправляем событие в Kafka
    msg := createOrderEvent(orderID)
    err = kafkaProducer.Send(msg)
    if err != nil {
        // Что делать здесь? Транзакция БД уже завершена успешно!
        // Событие потеряно. Состояние системы становится противоречивым.
        tx.Rollback() // Уже поздно, UPDATE выполнен.
        return err
    }

    // 3. Фиксируем транзакцию БД
    return tx.Commit()
}

Проблема: Если шаг 2 (отправка в Kafka) завершится ошибкой после успешного UPDATE, транзакцию БД уже нельзя безопасно откатить. Данные в БД и в событийной шине рассогласованы.

Как работает Transaction Outbox

Решение заключается в добавлении промежуточного шага:

  1. Сохранение события в Outbox в рамках транзакции: Создается таблица outbox (или outbox_messages). Сообщение сохраняется туда как запись в той же транзакции, что и основное бизнес-обновление.
  2. Атомарная фиксация: Коммит транзакции фиксирует и изменение бизнес-состояния, и факт того, что событие должно быть отправлено. Это гарантирует надежность "at-least-once".
  3. Асинхронная доставка: Отдельный фоновый процесс (часто называемый релеем или публикатором) периодически опрашивает таблицу outbox на наличие новых, неотправленных сообщений (status = 'pending').
  4. Отправка и маркировка: Релей извлекает сообщение, отправляет его во внешнюю систему (брокер) и в отдельной транзакции обновляет статус записи в Outbox на sent или удаляет ее. Это защищает от повторных отправок при сбоях релея.

Пример реализации на Go

1. Структура таблицы Outbox (SQL миграция)

CREATE TABLE outbox_messages (
    id BIGSERIAL PRIMARY KEY,
    aggregate_type VARCHAR(255) NOT NULL, -- Сущность (например, "order")
    aggregate_id VARCHAR(255) NOT NULL,   -- ID сущности
    event_type VARCHAR(255) NOT NULL,     -- Тип события ("OrderCreated")
    payload JSONB NOT NULL,               -- Данные события
    status VARCHAR(50) DEFAULT 'pending', -- 'pending', 'sent', 'failed'
    created_at TIMESTAMP DEFAULT NOW(),
    sent_at TIMESTAMP
);

CREATE INDEX idx_outbox_pending ON outbox_messages(status) WHERE status = 'pending';

2. Сохранение в Outbox в рамках бизнес-транзакции

func processOrderWithOutbox(orderID string, tx *sql.Tx) error {
    // 1. Бизнес-логика: обновление заказа
    _, err := tx.Exec("UPDATE orders SET status = 'processed' WHERE id = $1", orderID)
    if err != nil {
        return err
    }

    // 2. Создание и сохранение события В ТУ ЖЕ ТРАНЗАКЦИЮ
    event := OutboxMessage{
        AggregateType: "order",
        AggregateID:   orderID,
        EventType:     "OrderCreated",
        Payload:       []byte(`{"id":"` + orderID + `","status":"processed"}`),
        Status:        "pending",
    }
    err = saveOutboxMessage(tx, event)
    if err != nil {
        return err // Если ошибка здесь, вся транзакция откатится
    }

    // 3. Фиксация. Теперь гарантировано: если заказ обновлен, событие записано.
    return tx.Commit()
}

func saveOutboxMessage(tx *sql.Tx, msg OutboxMessage) error {
    query := `INSERT INTO outbox_messages 
              (aggregate_type, aggregate_id, event_type, payload, status) 
              VALUES ($1, $2, $3, $4, $5)`
    _, err := tx.Exec(query, msg.AggregateType, msg.AggregateID,
        msg.EventType, msg.Payload, msg.Status)
    return err
}

3. Фоновый процесс-релей (Publisher)

func startOutboxPublisher(ctx context.Context, db *sql.DB, producer kafka.Producer) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // В отдельной транзакции извлекаем и обрабатываем сообщения
            err := publishPendingMessages(db, producer)
            if err != nil {
                log.Printf("Outbox publisher error: %v", err)
            }
        }
    }
}

func publishPendingMessages(db *sql.DB, producer kafka.Producer) error {
    // Начинаем транзакцию для извлечения
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Извлекаем блок сообщений с блокировкой строк (SELECT FOR UPDATE)
    rows, err := tx.Query(`
        SELECT id, payload, event_type FROM outbox_messages 
        WHERE status = 'pending' 
        ORDER BY created_at 
        LIMIT 100 
        FOR UPDATE SKIP LOCKED`)
    if err != nil {
        return err
    }
    defer rows.Close()

    var messages []OutboxMessage
    for rows.Next() {
        var msg OutboxMessage
        rows.Scan(&msg.ID, &msg.Payload, &msg.EventType)
        messages = append(messages, msg)
    }

    // Для каждого сообщения: отправляем в Kafka и обновляем статус
    for _, msg := range messages {
        kafkaMsg := &kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &msg.EventType, Partition: kafka.PartitionAny},
            Value:          msg.Payload,
        }
        err = producer.Produce(kafkaMsg, nil)
        if err != nil {
            // Можно пометить как failed и повторить позже
            continue
        }
        // Помечаем как отправленное
        _, err = tx.Exec("UPDATE outbox_messages SET status='sent', sent_at=NOW() WHERE id=$1", msg.ID)
        if err != nil {
            return err
        }
    }
    // Фиксируем все изменения статусов
    return tx.Commit()
}

Преимущества и недостатки паттерна

Преимущества:

  • Надежность: Гарантирует, что событие будет сохранено, если бизнес-транзакция завершена успешно.
  • Атомарность: Достигается средствами локальной транзакции БД, без сложных распределенных транзакций.
  • Декомпозиция: Отделяет бизнес-логику от инфраструктуры отправки сообщений.
  • Отказоустойчивость: Позволяет обрабатывать временную недоступность брокера сообщений.
  • Единый источник правды: Таблица Outbox становится журналом всех событий, которые должны быть опубликованы.

Недостатки и соображения:

  • Сложность: Добавляет дополнительную компоненту (релей) и схему БД.
  • Задержка (Latency): Событие публикуется не мгновенно, а с задержкой, определяемой интервалом работы релея.
  • Потенциал дублирования: Релей должен быть идемпотентным, так как возможна повторная отправка (например, если релей упал после отправки в Kafka, но до обновления статуса). Это требует идемпотентной обработки на стороне потребителей.
  • Мониторинг: Необходимо следить за здоровьем процесса-релея и ростом очереди в таблице outbox.

Оптимизации и вариации

  • CDC (Change Data Capture): Вместо кастомной таблицы и релея можно использовать инструменты вроде Debezium, которые считывают журнал транзакций БД (WAL) и публикуют изменения напрямую в брокер. Это более производительный и менее инвазивный подход.
  • Пакетная отправка: Релей может отправлять сообщения пачками для повышения эффективности.
  • Отдельная база данных: Для высоконагруженных систем таблицу Outbox иногда выносят в отдельную, оптимизированную под запись БД.

В итоге, Transaction Outbox — это фундаментальный паттерн для построения надежных событийно-ориентированных систем на Go, который обеспечивает консистентность данных между сервисами в eventually consistent архитектуре. Его реализация требует внимания к деталям, но он является де-факто стандартом для решения проблемы двойной записи.