Что такое Transaction Outbox?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое Transaction Outbox?
Transaction Outbox — это архитектурный паттерн, используемый для обеспечения надежной отправки сообщений или событий в распределенных системах, особенно в контексте микросервисов. Он решает ключевую проблему: как гарантировать атомарность обновления базы данных и отправки сообщения (например, в брокер сообщений, как Kafka или RabbitMQ) без использования распределенных транзакций (2PC), которые считаются ненадежными и сложными в поддержке.
Основная идея заключается в том, чтобы откладывать реальную отправку сообщения, сначала сохраняя его в специальную таблицу-«исходящий почтовый ящик» (Outbox) в рамках той же транзакции базы данных, что и основное бизнес-обновление. Таким образом, обновление состояния и запись сообщения становятся атомарными. Затем отдельный процесс (Publisher или Relay) асинхронно извлекает новые записи из Outbox и доставляет их во внешнюю систему.
Проблема, которую решает паттерн
Рассмотрим классический пример на Go: сервис обрабатывает заказ и должен обновить его статус в БД и отправить событие OrderCreated в брокер.
// НАИВНАЯ И НЕБЕЗОПАСНАЯ РЕАЛИЗАЦИЯ (ПРОБЛЕМА)
func processOrder(orderID string, tx *sql.Tx) error {
// 1. Обновляем статус заказа в БД
_, err := tx.Exec("UPDATE orders SET status = 'processed' WHERE id = $1", orderID)
if err != nil {
tx.Rollback()
return err
}
// 2. Отправляем событие в Kafka
msg := createOrderEvent(orderID)
err = kafkaProducer.Send(msg)
if err != nil {
// Что делать здесь? Транзакция БД уже завершена успешно!
// Событие потеряно. Состояние системы становится противоречивым.
tx.Rollback() // Уже поздно, UPDATE выполнен.
return err
}
// 3. Фиксируем транзакцию БД
return tx.Commit()
}
Проблема: Если шаг 2 (отправка в Kafka) завершится ошибкой после успешного UPDATE, транзакцию БД уже нельзя безопасно откатить. Данные в БД и в событийной шине рассогласованы.
Как работает Transaction Outbox
Решение заключается в добавлении промежуточного шага:
- Сохранение события в Outbox в рамках транзакции: Создается таблица
outbox(илиoutbox_messages). Сообщение сохраняется туда как запись в той же транзакции, что и основное бизнес-обновление. - Атомарная фиксация: Коммит транзакции фиксирует и изменение бизнес-состояния, и факт того, что событие должно быть отправлено. Это гарантирует надежность "at-least-once".
- Асинхронная доставка: Отдельный фоновый процесс (часто называемый релеем или публикатором) периодически опрашивает таблицу
outboxна наличие новых, неотправленных сообщений (status = 'pending'). - Отправка и маркировка: Релей извлекает сообщение, отправляет его во внешнюю систему (брокер) и в отдельной транзакции обновляет статус записи в Outbox на
sentили удаляет ее. Это защищает от повторных отправок при сбоях релея.
Пример реализации на Go
1. Структура таблицы Outbox (SQL миграция)
CREATE TABLE outbox_messages (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL, -- Сущность (например, "order")
aggregate_id VARCHAR(255) NOT NULL, -- ID сущности
event_type VARCHAR(255) NOT NULL, -- Тип события ("OrderCreated")
payload JSONB NOT NULL, -- Данные события
status VARCHAR(50) DEFAULT 'pending', -- 'pending', 'sent', 'failed'
created_at TIMESTAMP DEFAULT NOW(),
sent_at TIMESTAMP
);
CREATE INDEX idx_outbox_pending ON outbox_messages(status) WHERE status = 'pending';
2. Сохранение в Outbox в рамках бизнес-транзакции
func processOrderWithOutbox(orderID string, tx *sql.Tx) error {
// 1. Бизнес-логика: обновление заказа
_, err := tx.Exec("UPDATE orders SET status = 'processed' WHERE id = $1", orderID)
if err != nil {
return err
}
// 2. Создание и сохранение события В ТУ ЖЕ ТРАНЗАКЦИЮ
event := OutboxMessage{
AggregateType: "order",
AggregateID: orderID,
EventType: "OrderCreated",
Payload: []byte(`{"id":"` + orderID + `","status":"processed"}`),
Status: "pending",
}
err = saveOutboxMessage(tx, event)
if err != nil {
return err // Если ошибка здесь, вся транзакция откатится
}
// 3. Фиксация. Теперь гарантировано: если заказ обновлен, событие записано.
return tx.Commit()
}
func saveOutboxMessage(tx *sql.Tx, msg OutboxMessage) error {
query := `INSERT INTO outbox_messages
(aggregate_type, aggregate_id, event_type, payload, status)
VALUES ($1, $2, $3, $4, $5)`
_, err := tx.Exec(query, msg.AggregateType, msg.AggregateID,
msg.EventType, msg.Payload, msg.Status)
return err
}
3. Фоновый процесс-релей (Publisher)
func startOutboxPublisher(ctx context.Context, db *sql.DB, producer kafka.Producer) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// В отдельной транзакции извлекаем и обрабатываем сообщения
err := publishPendingMessages(db, producer)
if err != nil {
log.Printf("Outbox publisher error: %v", err)
}
}
}
}
func publishPendingMessages(db *sql.DB, producer kafka.Producer) error {
// Начинаем транзакцию для извлечения
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Извлекаем блок сообщений с блокировкой строк (SELECT FOR UPDATE)
rows, err := tx.Query(`
SELECT id, payload, event_type FROM outbox_messages
WHERE status = 'pending'
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED`)
if err != nil {
return err
}
defer rows.Close()
var messages []OutboxMessage
for rows.Next() {
var msg OutboxMessage
rows.Scan(&msg.ID, &msg.Payload, &msg.EventType)
messages = append(messages, msg)
}
// Для каждого сообщения: отправляем в Kafka и обновляем статус
for _, msg := range messages {
kafkaMsg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &msg.EventType, Partition: kafka.PartitionAny},
Value: msg.Payload,
}
err = producer.Produce(kafkaMsg, nil)
if err != nil {
// Можно пометить как failed и повторить позже
continue
}
// Помечаем как отправленное
_, err = tx.Exec("UPDATE outbox_messages SET status='sent', sent_at=NOW() WHERE id=$1", msg.ID)
if err != nil {
return err
}
}
// Фиксируем все изменения статусов
return tx.Commit()
}
Преимущества и недостатки паттерна
Преимущества:
- Надежность: Гарантирует, что событие будет сохранено, если бизнес-транзакция завершена успешно.
- Атомарность: Достигается средствами локальной транзакции БД, без сложных распределенных транзакций.
- Декомпозиция: Отделяет бизнес-логику от инфраструктуры отправки сообщений.
- Отказоустойчивость: Позволяет обрабатывать временную недоступность брокера сообщений.
- Единый источник правды: Таблица Outbox становится журналом всех событий, которые должны быть опубликованы.
Недостатки и соображения:
- Сложность: Добавляет дополнительную компоненту (релей) и схему БД.
- Задержка (Latency): Событие публикуется не мгновенно, а с задержкой, определяемой интервалом работы релея.
- Потенциал дублирования: Релей должен быть идемпотентным, так как возможна повторная отправка (например, если релей упал после отправки в Kafka, но до обновления статуса). Это требует идемпотентной обработки на стороне потребителей.
- Мониторинг: Необходимо следить за здоровьем процесса-релея и ростом очереди в таблице
outbox.
Оптимизации и вариации
- CDC (Change Data Capture): Вместо кастомной таблицы и релея можно использовать инструменты вроде Debezium, которые считывают журнал транзакций БД (WAL) и публикуют изменения напрямую в брокер. Это более производительный и менее инвазивный подход.
- Пакетная отправка: Релей может отправлять сообщения пачками для повышения эффективности.
- Отдельная база данных: Для высоконагруженных систем таблицу Outbox иногда выносят в отдельную, оптимизированную под запись БД.
В итоге, Transaction Outbox — это фундаментальный паттерн для построения надежных событийно-ориентированных систем на Go, который обеспечивает консистентность данных между сервисами в eventually consistent архитектуре. Его реализация требует внимания к деталям, но он является де-факто стандартом для решения проблемы двойной записи.