Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Transaction Log в Apache Kafka
Transaction Log в Kafka — это журнал всех транзакций, который отслеживает атомарные операции (write операции нескольких партиций как единица). Это фундаментальный механизм для обеспечения exactly-once семантики доставки сообщений.
Что такое транзакция в Kafka
Транзакция в Kafka — это группа сообщений, которые либо все написаны в топик(и), либо все отклоняются как единица. Это позволяет:
- Писать в несколько партиций атомарно
- Гарантировать консистентность при сбое
- Предотвращать дублирование при ретрайах
Как работает Transaction Log
Transaction Log хранится в специальном внутреннем топике __transaction_state. Kafka отслеживает каждую транзакцию через её жизненный цикл:
1. BEGIN — начало транзакции (consumer, producer)
2. ONGOING — сообщения пишутся в целевые партиции
3. PREPARE_COMMIT — подготовка к фиксации
4. COMMITTED — все сообщения зафиксированы
5. ABORTED — транзакция отменена, сообщения не видны
Механизм гарантий exactly-once
// Producer с транзакциями для exactly-once
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-producer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Инициализация транзакций
try {
producer.beginTransaction();
// Пишем несколько сообщений как одну атомарную операцию
producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
// Если всё прошло успешно
producer.commitTransaction();
} catch (Exception e) {
// При ошибке — откатываем всю транзакцию
producer.abortTransaction();
System.out.println("Транзакция отменена: " + e.getMessage());
}
Consumer с включением изоляции транзакций
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// CRUCIAL: Уровень изоляции для чтения ТОЛЬКО коммитованных сообщений
props.put("isolation.level", "read_committed"); // read_uncommitted — видит все
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Видим ТОЛЬКО сообщения из коммитованных транзакций
System.out.println(record.value());
}
}
Структура Transaction Log
Kafka хранит метаданные транзакций в топике __transaction_state:
[
{"transactional_id": "producer-1", "producer_id": 12345, "state": "COMMITTED"},
{"transactional_id": "producer-2", "producer_id": 12346, "state": "ABORTED"}
]
Кроме этого, в каждом сообщении есть флаг isControl, указывающий, является ли оно управляющим сообщением (COMMIT или ABORT маркер).
Уровни изоляции
| Уровень | Описание | Риск |
|---|---|---|
| read_uncommitted | Видит все сообщения (и из некоммитованных транзакций) | Потеря данных при сбое |
| read_committed | Видит только сообщения из коммитованных транзакций | Гарантирует консистентность |
Гарантия idempotence (идемпотентность)
Каждый producer получает Producer ID (PID) и sequence number. Kafka использует эту пару для дедупликации:
props.put("enable.idempotence", true); // Автоматическая дедупликация
props.put("acks", "all"); // Ждём подтверждения от всех replicas
Даже если producer отправит одно сообщение 10 раз, Kafka сохранит его только один раз.
Сценарий: Consumer → Transformation → Producer (exactly-once)
// Обработка из topic1, трансформация, запись в topic2
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String transformed = record.value().toUpperCase();
producer.send(new ProducerRecord<>("output-topic", transformed));
}
// Отправляем offset в Kafka как часть транзакции
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
Это гарантирует: если сбой происходит между получением и отправкой, транзакция откатывается и при перезагрузке сообщение обработается снова.
Производительность транзакций
- Overhead: ~10-30% снижение throughput
- Latency: Увеличение latency из-за ожидания COMMIT
- Компромисс: Выбираем между скоростью и гарантиями
Когда использовать транзакции
✅ Когда нужны:
- Обработка платежей
- Критичные финансовые данные
- Сценарии с требованием exactly-once
❌ Когда не нужны:
- Логирование и мониторинг (at-least-once OK)
- High-throughput аналитика
- Когда at-least-once обработка приемлема
Transaction Log — ключевой механизм Kafka для обеспечения надёжности и консистентности в распределённых системах обработки потоков данных.