← Назад к вопросам

Какой способ доставки был в вашей системе (at least once, at most once, excactly once)?

2.7 Senior🔥 131 комментариев
#Apache Kafka и потоковая обработка#ETL и качество данных#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Какой способ доставки был в вашей системе (at least once, at most once, exactly once)

В streaming системе использовал exactly-once semantics для критичных данных, at-least-once для аналитики.

1. At-Least-Once (АЛЧ)

Описание: сообщение доставляется минимум один раз, может быть несколько раз.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks='all',  # Все репликаторы должны подтвердить
    retries=3    # Переотправляем при ошибке
)

def send_event(event):
    try:
        producer.send('events', json.dumps(event).encode())
        producer.flush()
    except Exception as e:
        # Переотправляем
        send_event(event)

Проблема: дубликаты в целевой системе.

Решение: deduplicate на потребителе:

from kafka import KafkaConsumer
import sqlite3

consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'])
db = sqlite3.connect('processed_events.db')

seen_events = set()

for message in consumer:
    event_id = message.value['event_id']
    
    if event_id in seen_events:
        continue  # Пропускаем дубликат
    
    # Обрабатываем событие
    process_event(message.value)
    seen_events.add(event_id)

2. At-Most-Once (АМЧ)

Описание: сообщение доставляется максимум один раз, может быть потеряно.

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks=1,      # Только лидер должен подтвердить
    retries=0    # Не переотправляем
)

def send_event_fast(event):
    # Быстро отправляем, не ждём подтверждения
    producer.send_and_forget('events', json.dumps(event).encode())

Использование: неважные метрики (impressions, page views).

3. Exactly-Once (ТА)

Описание: каждое сообщение доставляется ровно один раз.

Реализация в Kafka + Spark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window

spark = SparkSession.builder \
    .config("spark.sql.streaming.forceDeleteTempCheckpointDir", "true") \
    .getOrCreate()

# Читаем с Kafka с точкой отсчёта
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()

# Трансформация с idempotent операциями
processed = df.select(
    col("value").cast("string").alias("raw"),
    col("offset").alias("unique_id")  # Kafka offset уникален
).filter(
    # Только новые сообщения
    col("offset") > "100000"
)

# Запись с transactional гарантией
processed.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "transactions_processed") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start() \
    .awaitTermination()

Ключ exactly-once: комбинация идемпотентности + checkpoints.

4. Идемпотентность (Idempotency)

Обработка одного события несколько раз даёт результат, как если бы один раз:

def process_transfer(transaction_id, from_account, to_account, amount):
    db = sqlite3.connect('bank.db')
    cursor = db.cursor()
    
    # Проверяем, уже ли обработана транзакция
    cursor.execute("SELECT status FROM transactions WHERE id = ?", (transaction_id,))
    existing = cursor.fetchone()
    
    if existing:
        return existing[0]  # Уже обработана
    
    # Выполняем перевод в одной транзакции
    try:
        cursor.execute("INSERT INTO transactions (id, status) VALUES (?, 'processing')")
        
        cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_account))
        cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_account))
        
        cursor.execute("UPDATE transactions SET status = 'completed' WHERE id = ?", (transaction_id,))
        db.commit()
        return 'completed'
    except Exception as e:
        db.rollback()
        cursor.execute("UPDATE transactions SET status = 'failed' WHERE id = ?", (transaction_id,))
        db.commit()
        return 'failed'

5. Практическая реализация в нашей системе

Архитектура:

# 1. Kafka producer с retries
producer_config = {
    'bootstrap_servers': ['kafka1:9092', 'kafka2:9092'],
    'acks': 'all',           # exactly-once
    'retries': 5,
    'max_in_flight_requests_per_connection': 1  # Порядок
}

# 2. Checkpoint в Spark
spark_config = {
    'spark.sql.streaming.checkpointLocation': 's3://checkpoints/',
    'spark.streaming.kafka.maxRatePerPartition': 1000
}

# 3. Idempotent обработка
class TransactionProcessor:
    def __init__(self, db):
        self.db = db
    
    def process(self, event):
        # Проверяем в БД по event_id
        if self.is_processed(event['id']):
            return 'already_processed'
        
        # Обрабатываем с гарантией
        result = self.execute_transaction(event)
        self.mark_as_processed(event['id'])
        
        return result

6. Сравнение

МетрикаAt-Least-OnceAt-Most-OnceExactly-Once
ПроизводительностьНизкая (retries)Очень высокаяСредняя
Потеря данныхНетВозможнаНет
ДубликатыДаНетНет
СложностьНизкаяНизкаяВысокая
ИспользованиеАналитикаMetricsФинансы

Выбор в практике

  • Purchase events: exactly-once (финансы)
  • Page views: at-most-once (скорость, потери OK)
  • Orders: exactly-once (важные данные)
  • Impressions: at-least-once (дедупликация дешевле)

Результат: комбинированный подход покрыл 99.99% кейсов с балансом скорости и надёжности.