Какой способ доставки был в вашей системе (at least once, at most once, excactly once)?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Какой способ доставки был в вашей системе (at least once, at most once, exactly once)
В streaming системе использовал exactly-once semantics для критичных данных, at-least-once для аналитики.
1. At-Least-Once (АЛЧ)
Описание: сообщение доставляется минимум один раз, может быть несколько раз.
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
acks='all', # Все репликаторы должны подтвердить
retries=3 # Переотправляем при ошибке
)
def send_event(event):
try:
producer.send('events', json.dumps(event).encode())
producer.flush()
except Exception as e:
# Переотправляем
send_event(event)
Проблема: дубликаты в целевой системе.
Решение: deduplicate на потребителе:
from kafka import KafkaConsumer
import sqlite3
consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'])
db = sqlite3.connect('processed_events.db')
seen_events = set()
for message in consumer:
event_id = message.value['event_id']
if event_id in seen_events:
continue # Пропускаем дубликат
# Обрабатываем событие
process_event(message.value)
seen_events.add(event_id)
2. At-Most-Once (АМЧ)
Описание: сообщение доставляется максимум один раз, может быть потеряно.
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
acks=1, # Только лидер должен подтвердить
retries=0 # Не переотправляем
)
def send_event_fast(event):
# Быстро отправляем, не ждём подтверждения
producer.send_and_forget('events', json.dumps(event).encode())
Использование: неважные метрики (impressions, page views).
3. Exactly-Once (ТА)
Описание: каждое сообщение доставляется ровно один раз.
Реализация в Kafka + Spark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window
spark = SparkSession.builder \
.config("spark.sql.streaming.forceDeleteTempCheckpointDir", "true") \
.getOrCreate()
# Читаем с Kafka с точкой отсчёта
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "earliest") \
.load()
# Трансформация с idempotent операциями
processed = df.select(
col("value").cast("string").alias("raw"),
col("offset").alias("unique_id") # Kafka offset уникален
).filter(
# Только новые сообщения
col("offset") > "100000"
)
# Запись с transactional гарантией
processed.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "transactions_processed") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start() \
.awaitTermination()
Ключ exactly-once: комбинация идемпотентности + checkpoints.
4. Идемпотентность (Idempotency)
Обработка одного события несколько раз даёт результат, как если бы один раз:
def process_transfer(transaction_id, from_account, to_account, amount):
db = sqlite3.connect('bank.db')
cursor = db.cursor()
# Проверяем, уже ли обработана транзакция
cursor.execute("SELECT status FROM transactions WHERE id = ?", (transaction_id,))
existing = cursor.fetchone()
if existing:
return existing[0] # Уже обработана
# Выполняем перевод в одной транзакции
try:
cursor.execute("INSERT INTO transactions (id, status) VALUES (?, 'processing')")
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_account))
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_account))
cursor.execute("UPDATE transactions SET status = 'completed' WHERE id = ?", (transaction_id,))
db.commit()
return 'completed'
except Exception as e:
db.rollback()
cursor.execute("UPDATE transactions SET status = 'failed' WHERE id = ?", (transaction_id,))
db.commit()
return 'failed'
5. Практическая реализация в нашей системе
Архитектура:
# 1. Kafka producer с retries
producer_config = {
'bootstrap_servers': ['kafka1:9092', 'kafka2:9092'],
'acks': 'all', # exactly-once
'retries': 5,
'max_in_flight_requests_per_connection': 1 # Порядок
}
# 2. Checkpoint в Spark
spark_config = {
'spark.sql.streaming.checkpointLocation': 's3://checkpoints/',
'spark.streaming.kafka.maxRatePerPartition': 1000
}
# 3. Idempotent обработка
class TransactionProcessor:
def __init__(self, db):
self.db = db
def process(self, event):
# Проверяем в БД по event_id
if self.is_processed(event['id']):
return 'already_processed'
# Обрабатываем с гарантией
result = self.execute_transaction(event)
self.mark_as_processed(event['id'])
return result
6. Сравнение
| Метрика | At-Least-Once | At-Most-Once | Exactly-Once |
|---|---|---|---|
| Производительность | Низкая (retries) | Очень высокая | Средняя |
| Потеря данных | Нет | Возможна | Нет |
| Дубликаты | Да | Нет | Нет |
| Сложность | Низкая | Низкая | Высокая |
| Использование | Аналитика | Metrics | Финансы |
Выбор в практике
- Purchase events: exactly-once (финансы)
- Page views: at-most-once (скорость, потери OK)
- Orders: exactly-once (важные данные)
- Impressions: at-least-once (дедупликация дешевле)
Результат: комбинированный подход покрыл 99.99% кейсов с балансом скорости и надёжности.