← Назад к вопросам
Как реализовать гарантированную отправку заявки с базы данных в очередь?
2.7 Senior🔥 121 комментариев
#Архитектура и паттерны#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Гарантированная отправка заявки из БД в очередь
Это классическая проблема: как обеспечить delivery-guarantee при добавлении заявки в очередь (queue, Celery, RabbitMQ и т.п.), чтобы сообщение не потерялось в случае падения.
Проблема
# Плохо: если упадет между insert и publish, заявка потеряется
def create_request_bad(data):
request = db.insert_request(data) # Сохранили в БД
queue.publish({ # Но если здесь упадет...
"request_id": request.id,
"data": data
})
return request
# Или наоборот: если упадет до insert, дубликат в очереди
Решение 1: Outbox Pattern (Лучший вариант)
Сохраняем сообщение в отдельную таблицу, потом worker его отправляет:
1. Создаём таблицу Outbox
-- миграция 0001_create_outbox.sql
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL, -- ID заявки
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
published_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
version INT NOT NULL
);
CREATE INDEX idx_outbox_published ON outbox(published_at)
WHERE published_at IS NULL;
2. Транзакция: request + outbox
from sqlalchemy import insert, update
from datetime import datetime
from typing import Dict
import json
async def create_request_with_outbox(session, request_data: Dict):
"""
Создаём заявку и сообщение в одной транзакции.
"""
try:
# Создаём заявку
new_request = Request(
title=request_data["title"],
description=request_data["description"],
status="pending"
)
session.add(new_request)
await session.flush() # Получить ID
# Добавляем событие в Outbox
outbox_event = OutboxEvent(
aggregate_id=new_request.id,
event_type="RequestCreated",
event_data={
"request_id": str(new_request.id),
"title": new_request.title,
"timestamp": datetime.utcnow().isoformat()
},
version=1
)
session.add(outbox_event)
# Коммитим обе вставки вместе
await session.commit()
return new_request
except Exception as e:
await session.rollback()
raise
3. Worker, который отправляет из Outbox
import asyncio
from typing import List
class OutboxPublisher:
def __init__(self, session_factory, queue):
self.session_factory = session_factory
self.queue = queue
async def publish_events(self, batch_size: int = 100):
"""
Периодически публикует события из outbox.
Запускать в отдельной coroutine или Celery таске.
"""
async with self.session_factory() as session:
# Получаем непубликованные события
unpublished = await session.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at.is_(None))
.order_by(OutboxEvent.created_at)
.limit(batch_size)
)
events = unpublished.scalars().all()
if not events:
return
# Отправляем в очередь
for event in events:
try:
await self.queue.publish(
event.event_type,
event.event_data
)
# Отмечаем как опубликованное
event.published_at = datetime.utcnow()
except Exception as e:
print(f"Failed to publish {event.id}: {e}")
continue
# Коммитим все изменения
await session.commit()
async def start_polling(self, interval_seconds: int = 5):
"""
Непрерывно публикует события.
"""
while True:
try:
await self.publish_events()
except Exception as e:
print(f"Outbox publishing error: {e}")
await asyncio.sleep(interval_seconds)
# Запуск в приложении
async def startup_outbox_publisher(app):
publisher = OutboxPublisher(get_session, queue)
asyncio.create_task(publisher.start_polling())
app.add_event_handler("startup", startup_outbox_publisher)
Решение 2: Inbox Pattern (Для обработки сообщений)
Если мы получаем события и обрабатываем их:
# Таблица для дедупликации
CREATE TABLE inbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
message_id VARCHAR(255) UNIQUE NOT NULL, -- Идемпотентный ключ
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
processed_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_inbox_processed ON inbox(processed_at)
WHERE processed_at IS NULL;
Обработчик события:
async def handle_request_created(event_data: Dict, message_id: str):
"""
Обрабатываем событие создания заявки.
message_id для дедупликации.
"""
async with get_session() as session:
# Проверяем, не обработано ли это сообщение
existing = await session.execute(
select(InboxEvent).where(
InboxEvent.message_id == message_id
)
)
if existing.scalar():
return # Уже обработано
try:
# Обрабатываем событие
request_id = event_data["request_id"]
# Отправляем email, уведомление и т.п.
await send_notification(request_id)
# Добавляем в inbox
inbox_event = InboxEvent(
message_id=message_id,
event_type=event_data["type"],
event_data=event_data,
processed_at=datetime.utcnow()
)
session.add(inbox_event)
await session.commit()
except Exception as e:
print(f"Error processing event {message_id}: {e}")
await session.rollback()
raise
Решение 3: Transactional Outbox с Polling
Для Celery:
from celery import shared_task
from celery.schedules import schedule
@shared_task
def publish_outbox_events():
"""
Celery task для публикации событий.
"""
session = SessionLocal()
try:
# Получаем непубликованные
unpublished = session.query(OutboxEvent).filter(
OutboxEvent.published_at.is_(None)
).all()
for event in unpublished:
try:
# Публикуем в Celery
process_request.delay(
request_id=event.aggregate_id,
data=event.event_data
)
event.published_at = datetime.utcnow()
except Exception as e:
print(f"Error: {e}")
session.commit()
finally:
session.close()
# В Celery config
app.conf.beat_schedule = {
'publish-outbox': {
'task': 'tasks.publish_outbox_events',
'schedule': 5.0, # Каждые 5 секунд
},
}
Решение 4: CDC (Change Data Capture) с Debezium
Для больших объёмов:
# docker-compose.yml
services:
debezium:
image: debezium/connect:latest
environment:
BOOTSTRAP_SERVERS: kafka:9092
DATABASE_HOSTNAME: postgres
DATABASE_USER: postgres
DATABASE_PASSWORD: password
Сравнение подходов
| Подход | Простота | Гарантии | Scalability | Latency |
|---|---|---|---|---|
| Outbox Polling | Средне | At-least-once | Хорошо | 5-10с |
| Inbox | Средне | Exactly-once | Хорошо | Low |
| Debezium CDC | Сложно | Exactly-once | Отлично | Low |
| Simple Queue | Просто | At-most-once | Среднее | Low |
Лучшая практика
Используй Outbox Pattern:
- Одна транзакция сохраняет request + outbox event
- Отдельный worker читает из outbox и публикует
- После публикации отмечаем published_at
- Гарантирует at-least-once delivery
- Можно добавить дедупликацию на consumer'е для exactly-once
# Итоговая архитектура
Request Create → [Transaction: insert request + outbox event]
↓
Commit/Rollback
↓
Outbox Polling (5s)
↓
Publish to Queue
↓
Mark Published
Это гарантирует, что ни одна заявка не потеряется, даже если система упадет в самый неудачный момент.