Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Palindrome в Kafka
Что это такое
Palindrome в контексте Kafka — это механизм обеспечения exactly-once семантики (доставки сообщений точно один раз) при отправке данных из Kafka в другие системы. Это гарантирует, что сообщение будет обработано и сохранено ровно один раз, даже если произойдут сбои.
Однако нужно уточнить: в официальной документации Kafka это называется "Idempotent Producer" и "Transactional API", а не "Palindrome". Возможно, вопрос подразумевает именно эти механизмы.
Проблема: Duplicates (дубликаты)
Без гарантий, Kafka может создавать дубликаты при сбоях:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDuplicates {
public static void main(String[] args) {
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
String message = "Important Order #123";
try {
// Отправляем сообщение
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", message);
producer.send(record);
// Сбой здесь: потеря ответа
// Producer: "Может ли Broker получить сообщение?"
// Network: TIMEOUT
// Producer: "Неизвестно, повтори"
producer.send(record); // ДУБЛИКАТ!
} finally {
producer.close();
}
}
}
// Результат:
// orders: ["Important Order #123", "Important Order #123"]
// ^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^
// Оригинал ДУБЛИКАТ (непреднамеренный)
Решение 1: Idempotent Producer
Idempotence — свойство операции оставлять результат неизменным при повторном выполнении:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
public class IdempotentProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Включаем идемпотентность
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer =
new KafkaProducer<>(props);
String message = "Important Order #123";
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", message);
try {
// Даже если отправим дважды - попадёт только один раз
producer.send(record);
producer.send(record); // Broker видит дубликат
} finally {
producer.close();
}
}
}
// Результат:
// orders: ["Important Order #123"]
// Только один, дубликат отклонён Broker
Как работает Idempotence
Producer Broker
| |
|-- send(msg, seq=1) ------->|
| [msg stored, seq=1]
| |
|-- send(msg, seq=1) ------->| Retry!
| |
| [DUPLICATE!]
| [Проверка: seq=1 уже есть]
| [Отклонить]
|<------ ACK ----------------|
| |
Каждому Producer-у назначается Producer ID и sequence number для каждого сообщения.
Решение 2: Transactional API
Для exactly-once семантики используется транзакционный API:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class TransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Включаем транзакции
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"my-producer-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer =
new KafkaProducer<>(props);
// Инициализируем транзакции
producer.initTransactions();
try {
// Начинаем транзакцию
producer.beginTransaction();
// Отправляем несколько сообщений
producer.send(new ProducerRecord<>("orders",
"ORDER-1"));
producer.send(new ProducerRecord<>("orders",
"ORDER-2"));
producer.send(new ProducerRecord<>("payments",
"PAYMENT-1"));
// Если всё прошло - коммитим
producer.commitTransaction();
System.out.println("Транзакция успешна");
} catch (Exception e) {
// Если ошибка - откатываем
producer.abortTransaction();
System.err.println("Транзакция отменена: " + e);
} finally {
producer.close();
}
}
}
// Результат:
// orders: ["ORDER-1", "ORDER-2"]
// payments: ["PAYMENT-1"]
// Все три сообщения атомарно или ничего
Transactional Consumer
Для чтения транзакционных сообщений:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
public class TransactionalConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Читаем только committed сообщения
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders", "payments"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (var record : records) {
// Гарантировано committed, без дубликатов
System.out.println("Получено: " + record.value());
}
}
}
}
Comparison: At-Most-Once vs At-Least-Once vs Exactly-Once
// At-Most-Once (без гарантий)
properties.put("acks", "0");
// Может потеряться, но без дубликатов
// At-Least-Once (повтор если необходимо)
properties.put("acks", "all");
properties.put("retries", Integer.MAX_VALUE);
// Не потеряется, но может быть дубликат
// Exactly-Once (идеально)
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "id");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Точно один раз, no дубликатов, no потерь
Стоимость Exactly-Once
Performance Latency Complexity
_____________________________________________________
At-Most-Once Максимум Минимум Простая
At-Least-Once Хорошая Хорошая Средняя
Exactly-Once Нижняя Выше Сложная
Практический пример: Bank Transfer
public class BankTransferExample {
public void transferMoney(String fromAccount,
String toAccount, double amount) {
KafkaProducer<String, String> producer =
createTransactionalProducer();
try {
producer.beginTransaction();
// Отправляем события в правильном порядке
producer.send(new ProducerRecord<>("withdrawals",
"{\"account\": \"" + fromAccount + \
"\", \"amount\": " + amount + "}"));
producer.send(new ProducerRecord<>("deposits",
"{\"account\": \"" + toAccount + \
"\", \"amount\": " + amount + "}"));
producer.commitTransaction();
System.out.println("Перевод успешен");
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Перевод отменён");
} finally {
producer.close();
}
}
}
// Гарантия: либо оба события (вывод + зачисление),
// либо ничего. Никогда нет ситуации когда деньги вышли,
// но не пришли.
Заключение
Exactly-Once семантика в Kafka (через Idempotence и Transactional API) гарантирует, что сообщения будут доставлены и обработаны ровно один раз. Это критично для финансовых систем, заказов и других бизнес-операций где дубликаты недопустимы. Используй ENABLE_IDEMPOTENCE_CONFIG и TRANSACTIONAL_ID_CONFIG для обеспечения этой гарантии.