← Назад к вопросам
Как сделать чтобы заявка не отправлялась повторно в Kafka если не обновился статус что event был отправлен?
2.0 Middle🔥 251 комментариев
#Архитектура и паттерны#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Предотвращение повторной отправки заявок в Kafka
Это классическая проблема в распределённых системах — гарантия exactly-once delivery. Есть несколько подходов для решения.
1. Идемпотентность + состояние в БД
Концепция
Вещество, отправляй событие, но отмечай в БД что оно отправлено ПЕРЕД отправкой. Если заявка перезагрузится, не переотправляй.
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.orm import Session
from datetime import datetime, timezone
import json
class Request(Base):
__tablename__ = "requests"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, index=True)
data = Column(String)
status = Column(String, default="pending") # pending, processing, completed
event_sent = Column(Boolean, default=False) # ключевой флаг!
sent_at = Column(DateTime(timezone=True), nullable=True)
kafka_offset = Column(String, nullable=True) # для идентификации
async def process_request(request_id: int, db: Session):
"""Обработать заявку"""
request = db.query(Request).filter_by(id=request_id).with_for_update().first()
# Проверить, не отправляли ли уже событие
if request.event_sent:
print(f"Event for request {request_id} already sent")
return
# ВАЖНО: отметить что отправляем ДО отправки
try:
# Обновить статус в БД (гарантирует консистентность)
request.status = "processing"
request.event_sent = True
request.sent_at = datetime.now(timezone.utc)
db.commit() # Commit перед отправкой!
# Отправить в Kafka (может упасть)
await send_to_kafka({
"request_id": request.id,
"user_id": request.user_id,
"data": request.data
})
# Отметить успех
request.status = "completed"
db.commit()
except Exception as e:
# Если Kafka упала, event_sent уже True, не переотправляем
print(f"Error sending to Kafka: {e}")
request.status = "error"
db.commit()
raise
2. Идемпотентный ключ (Idempotency Key)
Концепция
Отправляй с уникальным идемпотентным ключом. Kafka/приёмник игнорирует дубликаты.
import uuid
from dataclasses import dataclass
@dataclass
class RequestEvent:
idempotency_key: str # Уникальный ключ для этой заявки
request_id: int
user_id: int
data: dict
timestamp: str
async def send_request_event(request_id: int, db: Session):
"""Отправить с идемпотентным ключом"""
request = db.query(Request).filter_by(id=request_id).first()
# Генерировать уникальный ключ (желательно в БД)
if not request.idempotency_key:
request.idempotency_key = str(uuid.uuid4())
db.commit()
event = RequestEvent(
idempotency_key=request.idempotency_key, # Это ключ!
request_id=request.id,
user_id=request.user_id,
data=json.loads(request.data),
timestamp=datetime.now(timezone.utc).isoformat()
)
# Отправить с ключом
message = {
"idempotency_key": event.idempotency_key,
"body": event.dict()
}
await producer.send_and_wait(
"requests_topic",
key=event.idempotency_key.encode(), # Ключ для партиции и дедупликации
value=json.dumps(message).encode()
)
3. Kafka Consumer с обработкой дубликатов
На стороне приёмника
from aiokafka import AIOKafkaConsumer
import json
class RequestEventConsumer:
def __init__(self, db):
self.db = db
# Таблица для отслеживания обработанных событий
self.processed = set() # В production используй Redis/БД
async def handle_message(self, message):
"""Обработать сообщение из Kafka"""
event = json.loads(message.value.decode())
idempotency_key = event["idempotency_key"]
# Проверить, не обработано ли уже
if idempotency_key in self.processed:
print(f"Event {idempotency_key} already processed, skipping")
return
try:
# Обработать событие
await self.process_event(event["body"])
# Отметить как обработанное
self.processed.add(idempotency_key)
# В production сохраняй в БД или Redis
except Exception as e:
print(f"Error processing event: {e}")
raise
async def process_event(self, body):
"""Бизнес-логика обработки"""
request_id = body["request_id"]
# Обработать заявку
print(f"Processing request {request_id}")
# Запуск consumer
async def consume_requests():
consumer = AIOKafkaConsumer(
'requests_topic',
bootstrap_servers=['localhost:9092'],
group_id='request_processors',
auto_offset_reset='earliest',
enable_auto_commit=True # Автокоммит после обработки
)
handler = RequestEventConsumer(db)
await consumer.start()
try:
async for message in consumer:
await handler.handle_message(message)
finally:
await consumer.stop()
4. Transactional Outbox Pattern
Суть: сохрани в БД и отправь как одну транзакцию
from sqlalchemy import Column, String, DateTime, Table
from datetime import datetime, timezone
class Outbox(Base):
"""Таблица для незадачны Kafka события"""
__tablename__ = "outbox"
id = Column(Integer, primary_key=True)
aggregate_id = Column(Integer) # request_id
aggregate_type = Column(String) # "request"
event_type = Column(String) # "request_created"
payload = Column(String) # JSON payload
created_at = Column(DateTime(timezone=True), default=datetime.now(timezone.utc))
published = Column(Boolean, default=False) # False пока не отправлено в Kafka
published_at = Column(DateTime(timezone=True), nullable=True)
async def create_request(user_id: int, data: dict, db: Session):
"""Создать заявку и отправить событие АТОМИЧНО"""
# Всё в одной транзакции
try:
# 1. Создать заявку
request = Request(user_id=user_id, data=json.dumps(data))
db.add(request)
db.flush() # Получить id без коммита
# 2. Создать событие в Outbox
outbox = Outbox(
aggregate_id=request.id,
aggregate_type="request",
event_type="request_created",
payload=json.dumps({
"request_id": request.id,
"user_id": user_id,
"data": data
})
)
db.add(outbox)
# 3. Коммитить ВСЁ вместе
db.commit()
except Exception as e:
db.rollback()
raise
# Отдельный сервис для отправки Outbox в Kafka
async def outbox_publisher():
"""Периодически проверять Outbox и отправлять в Kafka"""
while True:
# Найти неопубликованные события
unpublished = db.query(Outbox).filter_by(published=False).all()
for event in unpublished:
try:
# Отправить в Kafka
await producer.send_and_wait(
f"{event.aggregate_type}_events",
key=str(event.aggregate_id).encode(),
value=event.payload.encode()
)
# Отметить как опубликованное
event.published = True
event.published_at = datetime.now(timezone.utc)
db.commit()
except Exception as e:
print(f"Error publishing event: {e}")
# Retry позже
await asyncio.sleep(5)
# Проверять каждые 5 секунд
await asyncio.sleep(5)
5. Оптимистичная блокировка (Optimistic Locking)
class Request(Base):
__tablename__ = "requests"
id = Column(Integer, primary_key=True)
version = Column(Integer, default=1) # Версия для оптимистичной блокировки
data = Column(String)
event_sent = Column(Boolean, default=False)
sent_at = Column(DateTime(timezone=True), nullable=True)
async def send_request_event_optimistic(request_id: int, db: Session):
"""Отправить с оптимистичной блокировкой"""
while True:
request = db.query(Request).filter_by(id=request_id).first()
original_version = request.version
if request.event_sent:
print(f"Event already sent")
return
try:
# Отправить событие
await send_to_kafka({...})
# Обновить только если версия не изменилась
# (означает что другой процесс не обновил запись)
updated = db.query(Request).filter(
Request.id == request_id,
Request.version == original_version
).update(
{
Request.event_sent: True,
Request.sent_at: datetime.now(timezone.utc),
Request.version: original_version + 1
}
)
db.commit()
if updated == 0:
# Версия изменилась, retry
await asyncio.sleep(0.1)
continue
return
except Exception as e:
db.rollback()
print(f"Error: {e}")
raise
6. Лучшие практики
# ✓ ПРАВИЛЬНО
# 1. Отметить отправку в БД ДО отправки
request.event_sent = True
db.commit() # Гарантирует на диске
await send_to_kafka(...) # Может упасть
# 2. Использовать идемпотентный ключ
event = {
"idempotency_key": request.idempotency_key,
"data": {...}
}
await producer.send("topic", key=idempotency_key, value=event)
# 3. Обработчик проверяет дубликаты
if idempotency_key in self.processed:
return # Пропустить
# 4. SELECT FOR UPDATE для блокировки
request = db.query(Request).filter_by(id=request_id).with_for_update().first()
# ✗ НЕПРАВИЛЬНО
# Отправить первым, потом обновить БД
await send_to_kafka(...)
request.event_sent = True # Может не выполниться!
db.commit()
# Без идемпотентного ключа
await producer.send("topic", value=event) # Дубликаты возможны
# Без проверки дубликатов на приёмнике
await process_event(event) # Может обработать дважды
Сравнение подходов
Подход | Сложность | Гарантия | Когда использовать
------------------------------|-----------|-----------------|-------------------
Эвент_sent флаг + БД | Простой | Exactly once | Большинство случаев
Идемпотентный ключ | Средняя | Idempotent | Если может быть дубль
Outbox Pattern | Сложный | Exactly once | Критичные системы
Оптимистичная блокировка | Средняя | At most once | Высокая конкуренция
Итого
Для предотвращения повторной отправки:
- Используй флаг в БД — отмечай event_sent ДО отправки
- Идемпотентный ключ — каждому событию уникальный ID
- SELECT FOR UPDATE — блокировка при высокой конкуренции
- Outbox Pattern — для критичных систем
- Проверка дубликатов на приёмнике — дополнительная страховка
- Commit в БД перед отправкой в Kafka — гарантирует на диске