← Назад к вопросам

Как сделать чтобы заявка не отправлялась повторно в Kafka если не обновился статус что event был отправлен?

2.0 Middle🔥 251 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Предотвращение повторной отправки заявок в Kafka

Это классическая проблема в распределённых системах — гарантия exactly-once delivery. Есть несколько подходов для решения.

1. Идемпотентность + состояние в БД

Концепция

Вещество, отправляй событие, но отмечай в БД что оно отправлено ПЕРЕД отправкой. Если заявка перезагрузится, не переотправляй.

from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.orm import Session
from datetime import datetime, timezone
import json

class Request(Base):
    __tablename__ = "requests"
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, index=True)
    data = Column(String)
    status = Column(String, default="pending")  # pending, processing, completed
    event_sent = Column(Boolean, default=False)  # ключевой флаг!
    sent_at = Column(DateTime(timezone=True), nullable=True)
    kafka_offset = Column(String, nullable=True)  # для идентификации

async def process_request(request_id: int, db: Session):
    """Обработать заявку"""
    request = db.query(Request).filter_by(id=request_id).with_for_update().first()
    
    # Проверить, не отправляли ли уже событие
    if request.event_sent:
        print(f"Event for request {request_id} already sent")
        return
    
    # ВАЖНО: отметить что отправляем ДО отправки
    try:
        # Обновить статус в БД (гарантирует консистентность)
        request.status = "processing"
        request.event_sent = True
        request.sent_at = datetime.now(timezone.utc)
        db.commit()  # Commit перед отправкой!
        
        # Отправить в Kafka (может упасть)
        await send_to_kafka({
            "request_id": request.id,
            "user_id": request.user_id,
            "data": request.data
        })
        
        # Отметить успех
        request.status = "completed"
        db.commit()
        
    except Exception as e:
        # Если Kafka упала, event_sent уже True, не переотправляем
        print(f"Error sending to Kafka: {e}")
        request.status = "error"
        db.commit()
        raise

2. Идемпотентный ключ (Idempotency Key)

Концепция

Отправляй с уникальным идемпотентным ключом. Kafka/приёмник игнорирует дубликаты.

import uuid
from dataclasses import dataclass

@dataclass
class RequestEvent:
    idempotency_key: str  # Уникальный ключ для этой заявки
    request_id: int
    user_id: int
    data: dict
    timestamp: str

async def send_request_event(request_id: int, db: Session):
    """Отправить с идемпотентным ключом"""
    request = db.query(Request).filter_by(id=request_id).first()
    
    # Генерировать уникальный ключ (желательно в БД)
    if not request.idempotency_key:
        request.idempotency_key = str(uuid.uuid4())
        db.commit()
    
    event = RequestEvent(
        idempotency_key=request.idempotency_key,  # Это ключ!
        request_id=request.id,
        user_id=request.user_id,
        data=json.loads(request.data),
        timestamp=datetime.now(timezone.utc).isoformat()
    )
    
    # Отправить с ключом
    message = {
        "idempotency_key": event.idempotency_key,
        "body": event.dict()
    }
    
    await producer.send_and_wait(
        "requests_topic",
        key=event.idempotency_key.encode(),  # Ключ для партиции и дедупликации
        value=json.dumps(message).encode()
    )

3. Kafka Consumer с обработкой дубликатов

На стороне приёмника

from aiokafka import AIOKafkaConsumer
import json

class RequestEventConsumer:
    def __init__(self, db):
        self.db = db
        # Таблица для отслеживания обработанных событий
        self.processed = set()  # В production используй Redis/БД
    
    async def handle_message(self, message):
        """Обработать сообщение из Kafka"""
        event = json.loads(message.value.decode())
        idempotency_key = event["idempotency_key"]
        
        # Проверить, не обработано ли уже
        if idempotency_key in self.processed:
            print(f"Event {idempotency_key} already processed, skipping")
            return
        
        try:
            # Обработать событие
            await self.process_event(event["body"])
            
            # Отметить как обработанное
            self.processed.add(idempotency_key)
            # В production сохраняй в БД или Redis
            
        except Exception as e:
            print(f"Error processing event: {e}")
            raise
    
    async def process_event(self, body):
        """Бизнес-логика обработки"""
        request_id = body["request_id"]
        # Обработать заявку
        print(f"Processing request {request_id}")

# Запуск consumer
async def consume_requests():
    consumer = AIOKafkaConsumer(
        'requests_topic',
        bootstrap_servers=['localhost:9092'],
        group_id='request_processors',
        auto_offset_reset='earliest',
        enable_auto_commit=True  # Автокоммит после обработки
    )
    
    handler = RequestEventConsumer(db)
    
    await consumer.start()
    try:
        async for message in consumer:
            await handler.handle_message(message)
    finally:
        await consumer.stop()

4. Transactional Outbox Pattern

Суть: сохрани в БД и отправь как одну транзакцию

from sqlalchemy import Column, String, DateTime, Table
from datetime import datetime, timezone

class Outbox(Base):
    """Таблица для незадачны Kafka события"""
    __tablename__ = "outbox"
    
    id = Column(Integer, primary_key=True)
    aggregate_id = Column(Integer)  # request_id
    aggregate_type = Column(String)  # "request"
    event_type = Column(String)  # "request_created"
    payload = Column(String)  # JSON payload
    created_at = Column(DateTime(timezone=True), default=datetime.now(timezone.utc))
    published = Column(Boolean, default=False)  # False пока не отправлено в Kafka
    published_at = Column(DateTime(timezone=True), nullable=True)

async def create_request(user_id: int, data: dict, db: Session):
    """Создать заявку и отправить событие АТОМИЧНО"""
    
    # Всё в одной транзакции
    try:
        # 1. Создать заявку
        request = Request(user_id=user_id, data=json.dumps(data))
        db.add(request)
        db.flush()  # Получить id без коммита
        
        # 2. Создать событие в Outbox
        outbox = Outbox(
            aggregate_id=request.id,
            aggregate_type="request",
            event_type="request_created",
            payload=json.dumps({
                "request_id": request.id,
                "user_id": user_id,
                "data": data
            })
        )
        db.add(outbox)
        
        # 3. Коммитить ВСЁ вместе
        db.commit()
        
    except Exception as e:
        db.rollback()
        raise

# Отдельный сервис для отправки Outbox в Kafka
async def outbox_publisher():
    """Периодически проверять Outbox и отправлять в Kafka"""
    while True:
        # Найти неопубликованные события
        unpublished = db.query(Outbox).filter_by(published=False).all()
        
        for event in unpublished:
            try:
                # Отправить в Kafka
                await producer.send_and_wait(
                    f"{event.aggregate_type}_events",
                    key=str(event.aggregate_id).encode(),
                    value=event.payload.encode()
                )
                
                # Отметить как опубликованное
                event.published = True
                event.published_at = datetime.now(timezone.utc)
                db.commit()
                
            except Exception as e:
                print(f"Error publishing event: {e}")
                # Retry позже
                await asyncio.sleep(5)
        
        # Проверять каждые 5 секунд
        await asyncio.sleep(5)

5. Оптимистичная блокировка (Optimistic Locking)

class Request(Base):
    __tablename__ = "requests"
    
    id = Column(Integer, primary_key=True)
    version = Column(Integer, default=1)  # Версия для оптимистичной блокировки
    data = Column(String)
    event_sent = Column(Boolean, default=False)
    sent_at = Column(DateTime(timezone=True), nullable=True)

async def send_request_event_optimistic(request_id: int, db: Session):
    """Отправить с оптимистичной блокировкой"""
    
    while True:
        request = db.query(Request).filter_by(id=request_id).first()
        original_version = request.version
        
        if request.event_sent:
            print(f"Event already sent")
            return
        
        try:
            # Отправить событие
            await send_to_kafka({...})
            
            # Обновить только если версия не изменилась
            # (означает что другой процесс не обновил запись)
            updated = db.query(Request).filter(
                Request.id == request_id,
                Request.version == original_version
            ).update(
                {
                    Request.event_sent: True,
                    Request.sent_at: datetime.now(timezone.utc),
                    Request.version: original_version + 1
                }
            )
            db.commit()
            
            if updated == 0:
                # Версия изменилась, retry
                await asyncio.sleep(0.1)
                continue
            
            return
            
        except Exception as e:
            db.rollback()
            print(f"Error: {e}")
            raise

6. Лучшие практики

# ✓ ПРАВИЛЬНО
# 1. Отметить отправку в БД ДО отправки
request.event_sent = True
db.commit()  # Гарантирует на диске
await send_to_kafka(...)  # Может упасть

# 2. Использовать идемпотентный ключ
event = {
    "idempotency_key": request.idempotency_key,
    "data": {...}
}
await producer.send("topic", key=idempotency_key, value=event)

# 3. Обработчик проверяет дубликаты
if idempotency_key in self.processed:
    return  # Пропустить

# 4. SELECT FOR UPDATE для блокировки
request = db.query(Request).filter_by(id=request_id).with_for_update().first()

# ✗ НЕПРАВИЛЬНО
# Отправить первым, потом обновить БД
await send_to_kafka(...)
request.event_sent = True  # Может не выполниться!
db.commit()

# Без идемпотентного ключа
await producer.send("topic", value=event)  # Дубликаты возможны

# Без проверки дубликатов на приёмнике
await process_event(event)  # Может обработать дважды

Сравнение подходов

Подход                        | Сложность | Гарантия        | Когда использовать
------------------------------|-----------|-----------------|-------------------
Эвент_sent флаг + БД         | Простой   | Exactly once    | Большинство случаев
Идемпотентный ключ            | Средняя   | Idempotent      | Если может быть дубль
Outbox Pattern                | Сложный   | Exactly once    | Критичные системы
Оптимистичная блокировка      | Средняя   | At most once    | Высокая конкуренция

Итого

Для предотвращения повторной отправки:

  1. Используй флаг в БД — отмечай event_sent ДО отправки
  2. Идемпотентный ключ — каждому событию уникальный ID
  3. SELECT FOR UPDATE — блокировка при высокой конкуренции
  4. Outbox Pattern — для критичных систем
  5. Проверка дубликатов на приёмнике — дополнительная страховка
  6. Commit в БД перед отправкой в Kafka — гарантирует на диске
Как сделать чтобы заявка не отправлялась повторно в Kafka если не обновился статус что event был отправлен? | PrepBro