← Назад к вопросам

Как реализовать гарантированную отправку заявки с базы данных в очередь?

2.7 Senior🔥 121 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Гарантированная отправка заявки из БД в очередь

Это классическая проблема: как обеспечить delivery-guarantee при добавлении заявки в очередь (queue, Celery, RabbitMQ и т.п.), чтобы сообщение не потерялось в случае падения.

Проблема

# Плохо: если упадет между insert и publish, заявка потеряется
def create_request_bad(data):
    request = db.insert_request(data)  # Сохранили в БД
    queue.publish({                     # Но если здесь упадет...
        "request_id": request.id,
        "data": data
    })
    return request

# Или наоборот: если упадет до insert, дубликат в очереди

Решение 1: Outbox Pattern (Лучший вариант)

Сохраняем сообщение в отдельную таблицу, потом worker его отправляет:

1. Создаём таблицу Outbox

-- миграция 0001_create_outbox.sql
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_id UUID NOT NULL,  -- ID заявки
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    published_at TIMESTAMPTZ NULL,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    version INT NOT NULL
);

CREATE INDEX idx_outbox_published ON outbox(published_at)
WHERE published_at IS NULL;

2. Транзакция: request + outbox

from sqlalchemy import insert, update
from datetime import datetime
from typing import Dict
import json

async def create_request_with_outbox(session, request_data: Dict):
    """
    Создаём заявку и сообщение в одной транзакции.
    """
    try:
        # Создаём заявку
        new_request = Request(
            title=request_data["title"],
            description=request_data["description"],
            status="pending"
        )
        session.add(new_request)
        await session.flush()  # Получить ID
        
        # Добавляем событие в Outbox
        outbox_event = OutboxEvent(
            aggregate_id=new_request.id,
            event_type="RequestCreated",
            event_data={
                "request_id": str(new_request.id),
                "title": new_request.title,
                "timestamp": datetime.utcnow().isoformat()
            },
            version=1
        )
        session.add(outbox_event)
        
        # Коммитим обе вставки вместе
        await session.commit()
        
        return new_request
    
    except Exception as e:
        await session.rollback()
        raise

3. Worker, который отправляет из Outbox

import asyncio
from typing import List

class OutboxPublisher:
    def __init__(self, session_factory, queue):
        self.session_factory = session_factory
        self.queue = queue
    
    async def publish_events(self, batch_size: int = 100):
        """
        Периодически публикует события из outbox.
        Запускать в отдельной coroutine или Celery таске.
        """
        async with self.session_factory() as session:
            # Получаем непубликованные события
            unpublished = await session.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at.is_(None))
                .order_by(OutboxEvent.created_at)
                .limit(batch_size)
            )
            events = unpublished.scalars().all()
            
            if not events:
                return
            
            # Отправляем в очередь
            for event in events:
                try:
                    await self.queue.publish(
                        event.event_type,
                        event.event_data
                    )
                    
                    # Отмечаем как опубликованное
                    event.published_at = datetime.utcnow()
                    
                except Exception as e:
                    print(f"Failed to publish {event.id}: {e}")
                    continue
            
            # Коммитим все изменения
            await session.commit()
    
    async def start_polling(self, interval_seconds: int = 5):
        """
        Непрерывно публикует события.
        """
        while True:
            try:
                await self.publish_events()
            except Exception as e:
                print(f"Outbox publishing error: {e}")
            
            await asyncio.sleep(interval_seconds)

# Запуск в приложении
async def startup_outbox_publisher(app):
    publisher = OutboxPublisher(get_session, queue)
    asyncio.create_task(publisher.start_polling())

app.add_event_handler("startup", startup_outbox_publisher)

Решение 2: Inbox Pattern (Для обработки сообщений)

Если мы получаем события и обрабатываем их:

# Таблица для дедупликации
CREATE TABLE inbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    message_id VARCHAR(255) UNIQUE NOT NULL,  -- Идемпотентный ключ
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    processed_at TIMESTAMPTZ NULL,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_inbox_processed ON inbox(processed_at)
WHERE processed_at IS NULL;

Обработчик события:

async def handle_request_created(event_data: Dict, message_id: str):
    """
    Обрабатываем событие создания заявки.
    message_id для дедупликации.
    """
    async with get_session() as session:
        # Проверяем, не обработано ли это сообщение
        existing = await session.execute(
            select(InboxEvent).where(
                InboxEvent.message_id == message_id
            )
        )
        
        if existing.scalar():
            return  # Уже обработано
        
        try:
            # Обрабатываем событие
            request_id = event_data["request_id"]
            
            # Отправляем email, уведомление и т.п.
            await send_notification(request_id)
            
            # Добавляем в inbox
            inbox_event = InboxEvent(
                message_id=message_id,
                event_type=event_data["type"],
                event_data=event_data,
                processed_at=datetime.utcnow()
            )
            session.add(inbox_event)
            await session.commit()
        
        except Exception as e:
            print(f"Error processing event {message_id}: {e}")
            await session.rollback()
            raise

Решение 3: Transactional Outbox с Polling

Для Celery:

from celery import shared_task
from celery.schedules import schedule

@shared_task
def publish_outbox_events():
    """
    Celery task для публикации событий.
    """
    session = SessionLocal()
    try:
        # Получаем непубликованные
        unpublished = session.query(OutboxEvent).filter(
            OutboxEvent.published_at.is_(None)
        ).all()
        
        for event in unpublished:
            try:
                # Публикуем в Celery
                process_request.delay(
                    request_id=event.aggregate_id,
                    data=event.event_data
                )
                
                event.published_at = datetime.utcnow()
            except Exception as e:
                print(f"Error: {e}")
        
        session.commit()
    finally:
        session.close()

# В Celery config
app.conf.beat_schedule = {
    'publish-outbox': {
        'task': 'tasks.publish_outbox_events',
        'schedule': 5.0,  # Каждые 5 секунд
    },
}

Решение 4: CDC (Change Data Capture) с Debezium

Для больших объёмов:

# docker-compose.yml
services:
  debezium:
    image: debezium/connect:latest
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      DATABASE_HOSTNAME: postgres
      DATABASE_USER: postgres
      DATABASE_PASSWORD: password

Сравнение подходов

ПодходПростотаГарантииScalabilityLatency
Outbox PollingСреднеAt-least-onceХорошо5-10с
InboxСреднеExactly-onceХорошоLow
Debezium CDCСложноExactly-onceОтличноLow
Simple QueueПростоAt-most-onceСреднееLow

Лучшая практика

Используй Outbox Pattern:

  1. Одна транзакция сохраняет request + outbox event
  2. Отдельный worker читает из outbox и публикует
  3. После публикации отмечаем published_at
  4. Гарантирует at-least-once delivery
  5. Можно добавить дедупликацию на consumer'е для exactly-once
# Итоговая архитектура
Request Create → [Transaction: insert request + outbox event]
                         ↓
                    Commit/Rollback
                         ↓
               Outbox Polling (5s)
                         ↓
                  Publish to Queue
                         ↓
                    Mark Published

Это гарантирует, что ни одна заявка не потеряется, даже если система упадет в самый неудачный момент.