← Назад к вопросам

Что такое Transaction Log в Kafka?

2.4 Senior🔥 91 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Transaction Log в Apache Kafka

Transaction Log в Kafka — это журнал всех транзакций, который отслеживает атомарные операции (write операции нескольких партиций как единица). Это фундаментальный механизм для обеспечения exactly-once семантики доставки сообщений.

Что такое транзакция в Kafka

Транзакция в Kafka — это группа сообщений, которые либо все написаны в топик(и), либо все отклоняются как единица. Это позволяет:

  • Писать в несколько партиций атомарно
  • Гарантировать консистентность при сбое
  • Предотвращать дублирование при ретрайах

Как работает Transaction Log

Transaction Log хранится в специальном внутреннем топике __transaction_state. Kafka отслеживает каждую транзакцию через её жизненный цикл:

1. BEGIN — начало транзакции (consumer, producer)
2. ONGOING — сообщения пишутся в целевые партиции
3. PREPARE_COMMIT — подготовка к фиксации
4. COMMITTED — все сообщения зафиксированы
5. ABORTED — транзакция отменена, сообщения не видны

Механизм гарантий exactly-once

// Producer с транзакциями для exactly-once
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-producer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Инициализация транзакций

try {
    producer.beginTransaction();
    
    // Пишем несколько сообщений как одну атомарную операцию
    producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
    
    // Если всё прошло успешно
    producer.commitTransaction();
} catch (Exception e) {
    // При ошибке — откатываем всю транзакцию
    producer.abortTransaction();
    System.out.println("Транзакция отменена: " + e.getMessage());
}

Consumer с включением изоляции транзакций

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// CRUCIAL: Уровень изоляции для чтения ТОЛЬКО коммитованных сообщений
props.put("isolation.level", "read_committed"); // read_uncommitted — видит все

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // Видим ТОЛЬКО сообщения из коммитованных транзакций
        System.out.println(record.value());
    }
}

Структура Transaction Log

Kafka хранит метаданные транзакций в топике __transaction_state:

[
  {"transactional_id": "producer-1", "producer_id": 12345, "state": "COMMITTED"},
  {"transactional_id": "producer-2", "producer_id": 12346, "state": "ABORTED"}
]

Кроме этого, в каждом сообщении есть флаг isControl, указывающий, является ли оно управляющим сообщением (COMMIT или ABORT маркер).

Уровни изоляции

УровеньОписаниеРиск
read_uncommittedВидит все сообщения (и из некоммитованных транзакций)Потеря данных при сбое
read_committedВидит только сообщения из коммитованных транзакцийГарантирует консистентность

Гарантия idempotence (идемпотентность)

Каждый producer получает Producer ID (PID) и sequence number. Kafka использует эту пару для дедупликации:

props.put("enable.idempotence", true); // Автоматическая дедупликация
props.put("acks", "all"); // Ждём подтверждения от всех replicas

Даже если producer отправит одно сообщение 10 раз, Kafka сохранит его только один раз.

Сценарий: Consumer → Transformation → Producer (exactly-once)

// Обработка из topic1, трансформация, запись в topic2
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            String transformed = record.value().toUpperCase();
            producer.send(new ProducerRecord<>("output-topic", transformed));
        }
        
        // Отправляем offset в Kafka как часть транзакции
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
    }
}

Это гарантирует: если сбой происходит между получением и отправкой, транзакция откатывается и при перезагрузке сообщение обработается снова.

Производительность транзакций

  • Overhead: ~10-30% снижение throughput
  • Latency: Увеличение latency из-за ожидания COMMIT
  • Компромисс: Выбираем между скоростью и гарантиями

Когда использовать транзакции

Когда нужны:

  • Обработка платежей
  • Критичные финансовые данные
  • Сценарии с требованием exactly-once

Когда не нужны:

  • Логирование и мониторинг (at-least-once OK)
  • High-throughput аналитика
  • Когда at-least-once обработка приемлема

Transaction Log — ключевой механизм Kafka для обеспечения надёжности и консистентности в распределённых системах обработки потоков данных.