← Назад к вопросам

Как можно из at least once получать excatly once?

2.0 Middle🔥 111 комментариев
#Apache Kafka и потоковая обработка

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Гарантии доставки сообщений: At-Least-Once to Exactly-Once

Это одна из ключевых проблем в распределённых системах. At-least-once означает потенциальные дубликаты. Exactly-once требует дополнительной логики.

Понимание гарантий

At-least-once:

  • Сообщение точно будет обработано минимум один раз
  • При сбое consumer может обработать дважды
  • Проще реализовать, быстрее

Exactly-once:

  • Каждое сообщение обработано ровно один раз
  • Должны исключить дубликаты
  • Сложнее, медленнее

Подход 1: Идемпотентные операции

Если операция идемпотентна (результат одинаков при повторении), дубликаты не опасны:

# Идемпотентно: обновление значения
UPDATE users SET last_login = NOW() WHERE user_id = 123;

# НЕ идемпотентно: увеличение счётчика
UPDATE users SET login_count = login_count + 1 WHERE user_id = 123;

На практике с Kafka:

from kafka import KafkaConsumer
import psycopg2

consumer = KafkaConsumer('transactions', bootstrap_servers=['localhost:9092'])
conn = psycopg2.connect('dbname=mydb')

for msg in consumer:
    transaction = json.loads(msg.value)
    
    # Идемпотентная операция: SET вместо UPDATE
    cursor = conn.cursor()
    cursor.execute("""
        UPDATE accounts 
        SET balance = %s,
            updated_at = NOW()
        WHERE account_id = %s
        AND (transaction_id != %s OR updated_at < %s)
    """, (transaction['balance'], transaction['account_id'], 
          transaction['id'], transaction['timestamp']))
    conn.commit()

Подход 2: Deduplication с помощью Distributed ID

Сохраняем уникальный ID обработанных сообщений:

-- Таблица для tracking
CREATE TABLE processed_messages (
    message_id UUID PRIMARY KEY,
    kafka_offset BIGINT,
    partition INT,
    processed_at TIMESTAMP,
    result JSONB
);

-- Перед обработкой проверяем
SELECT * FROM processed_messages WHERE message_id = 'abc-123';

-- Если не найдено - обрабатываем и сохраняем
INSERT INTO processed_messages (message_id, result, processed_at)
VALUES ('abc-123', '{"status": "ok"}', NOW());

Python реализация:

from kafka import KafkaConsumer
import uuid

def process_message_exactly_once(msg):
    message_id = msg.get('id') or str(uuid.uuid4())
    
    # Проверка дубликата
    cursor.execute(
        'SELECT result FROM processed_messages WHERE message_id = %s',
        (message_id,)
    )
    result = cursor.fetchone()
    
    if result:
        return result[0]  # уже обработано
    
    # Новое сообщение - обрабатываем
    processed_result = process(msg)
    
    # Сохраняем atomically
    cursor.execute("""
        INSERT INTO processed_messages (message_id, result, processed_at)
        VALUES (%s, %s, NOW())
        ON CONFLICT (message_id) DO NOTHING
    """, (message_id, json.dumps(processed_result)))
    
    conn.commit()
    return processed_result

Подход 3: Двухфазный коммит (Two-Phase Commit)

Атомарная обработка и сохранение offset:

from kafka import KafkaConsumer
from kafka import OffsetAndMetadata

consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'],
                        enable_auto_commit=False)  # ВАЖНО!

for msg in consumer:
    try:
        # Обработка
        result = process(msg)
        
        # Сохранение результата в БД
        save_to_db(result)
        
        # ТОЛЬКО ЗАТЕМ коммитим offset в Kafka
        consumer.commit()
        
    except Exception as e:
        # При ошибке НЕ коммитим - переобработаем
        print(f"Error: {e}")
        # Следующий раз эта же партиция перечитает с последнего offset

Подход 4: Kafka Transactions (Transactional Write)

Isolate consumer и producer в одной транзакции:

from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError

# Producer с транзакциями
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    transactional_id='my-transaction-id-1'  # ВАЖНО!
)

consumer = KafkaConsumer(
    'input_topic',
    bootstrap_servers=['localhost:9092'],
    isolation_level='read_committed',  # читаем только committed
    enable_auto_commit=False
)

for msg in consumer:
    try:
        with producer.transaction():
            # Обработка
            result = process(msg)
            
            # Отправка результата
            producer.send('output_topic', value=result)
            
            # Сохранение offset
            consumer.commit_async()
            
    except Exception as e:
        # При ошибке транзакция откатывается
        print(f"Transaction failed: {e}")

Подход 5: Spark Structured Streaming

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

spark = SparkSession.builder.appName("ExactlyOnce").getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input_topic") \
    .option("startingOffsets", "earliest") \
    .load()

# Spark Structured Streaming гарантирует exactly-once
query = df \
    .select(col("value").cast("string")) \
    .writeStream \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/mydb") \
    .option("dbtable", "events") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .outputMode("append") \
    .start()

query.awaitTermination()

Сравнение подходов

ПодходСложностьПерформансГарантияКогда использовать
ИдемпотентностьНизкаяВысокийExactly-onceДля операций SET
Deduplication IDСредняяСреднийExactly-onceОбщий случай
2-Phase CommitСредняяСреднийExactly-onceС БД
Kafka TransactionsСредняяСреднийExactly-onceProducer-Consumer
Spark StructuredВысокаяВысокийExactly-onceBig data pipelines

На практике

  1. Первый выбор: идемпотентные операции (90% случаев)
  2. Если не идемпотентно: deduplication с message_id
  3. High volume: Kafka Transactions
  4. 批 processing: Spark/Flink с checkpoints

Вывод: Exactly-once не бесплатен. Нужно выбирать подход, подходящий по сложности и требованиям бизнеса.

Как можно из at least once получать excatly once? | PrepBro