← Назад к вопросам

Что такое Palindrome в Kafka?

2.3 Middle🔥 111 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Palindrome в Kafka

Что это такое

Palindrome в контексте Kafka — это механизм обеспечения exactly-once семантики (доставки сообщений точно один раз) при отправке данных из Kafka в другие системы. Это гарантирует, что сообщение будет обработано и сохранено ровно один раз, даже если произойдут сбои.

Однако нужно уточнить: в официальной документации Kafka это называется "Idempotent Producer" и "Transactional API", а не "Palindrome". Возможно, вопрос подразумевает именно эти механизмы.

Проблема: Duplicates (дубликаты)

Без гарантий, Kafka может создавать дубликаты при сбоях:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDuplicates {
    
    public static void main(String[] args) {
        KafkaProducer<String, String> producer = 
            new KafkaProducer<>(properties);
        
        String message = "Important Order #123";
        
        try {
            // Отправляем сообщение
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("orders", message);
            producer.send(record);
            
            // Сбой здесь: потеря ответа
            // Producer: "Может ли Broker получить сообщение?"
            // Network: TIMEOUT
            // Producer: "Неизвестно, повтори"
            producer.send(record);  // ДУБЛИКАТ!
            
        } finally {
            producer.close();
        }
    }
}

// Результат:
// orders: ["Important Order #123", "Important Order #123"]
//         ^^^^^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^^^^^
//         Оригинал                 ДУБЛИКАТ (непреднамеренный)

Решение 1: Idempotent Producer

Idempotence — свойство операции оставлять результат неизменным при повторном выполнении:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

public class IdempotentProducer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        
        // Включаем идемпотентность
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        KafkaProducer<String, String> producer = 
            new KafkaProducer<>(props);
        
        String message = "Important Order #123";
        ProducerRecord<String, String> record = 
            new ProducerRecord<>("orders", message);
        
        try {
            // Даже если отправим дважды - попадёт только один раз
            producer.send(record);
            producer.send(record);  // Broker видит дубликат
        } finally {
            producer.close();
        }
    }
}

// Результат:
// orders: ["Important Order #123"]
//         Только один, дубликат отклонён Broker

Как работает Idempotence

Producer                      Broker
   |                            |
   |-- send(msg, seq=1) ------->|
   |                         [msg stored, seq=1]
   |                            |
   |-- send(msg, seq=1) ------->|  Retry!
   |                            |
   |                     [DUPLICATE!]
   |                     [Проверка: seq=1 уже есть]
   |                     [Отклонить]
   |<------ ACK ----------------|
   |                            |

Каждому Producer-у назначается Producer ID и sequence number для каждого сообщения.

Решение 2: Transactional API

Для exactly-once семантики используется транзакционный API:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TransactionalProducer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        
        // Включаем транзакции
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
            "my-producer-1");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        KafkaProducer<String, String> producer = 
            new KafkaProducer<>(props);
        
        // Инициализируем транзакции
        producer.initTransactions();
        
        try {
            // Начинаем транзакцию
            producer.beginTransaction();
            
            // Отправляем несколько сообщений
            producer.send(new ProducerRecord<>("orders", 
                "ORDER-1"));
            producer.send(new ProducerRecord<>("orders", 
                "ORDER-2"));
            producer.send(new ProducerRecord<>("payments", 
                "PAYMENT-1"));
            
            // Если всё прошло - коммитим
            producer.commitTransaction();
            System.out.println("Транзакция успешна");
            
        } catch (Exception e) {
            // Если ошибка - откатываем
            producer.abortTransaction();
            System.err.println("Транзакция отменена: " + e);
        } finally {
            producer.close();
        }
    }
}

// Результат:
// orders: ["ORDER-1", "ORDER-2"]
// payments: ["PAYMENT-1"]
// Все три сообщения атомарно или ничего

Transactional Consumer

Для чтения транзакционных сообщений:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;

public class TransactionalConsumer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        
        // Читаем только committed сообщения
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
            "read_committed");
        
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("orders", "payments"));
        
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (var record : records) {
                // Гарантировано committed, без дубликатов
                System.out.println("Получено: " + record.value());
            }
        }
    }
}

Comparison: At-Most-Once vs At-Least-Once vs Exactly-Once

// At-Most-Once (без гарантий)
properties.put("acks", "0");
// Может потеряться, но без дубликатов

// At-Least-Once (повтор если необходимо)
properties.put("acks", "all");
properties.put("retries", Integer.MAX_VALUE);
// Не потеряется, но может быть дубликат

// Exactly-Once (идеально)
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "id");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Точно один раз, no дубликатов, no потерь

Стоимость Exactly-Once

                    Performance    Latency    Complexity
_____________________________________________________
At-Most-Once        Максимум       Минимум    Простая
At-Least-Once       Хорошая        Хорошая    Средняя
Exactly-Once        Нижняя         Выше       Сложная

Практический пример: Bank Transfer

public class BankTransferExample {
    
    public void transferMoney(String fromAccount, 
        String toAccount, double amount) {
        
        KafkaProducer<String, String> producer = 
            createTransactionalProducer();
        
        try {
            producer.beginTransaction();
            
            // Отправляем события в правильном порядке
            producer.send(new ProducerRecord<>("withdrawals",
                "{\"account\": \"" + fromAccount + \
                "\", \"amount\": " + amount + "}"));
            
            producer.send(new ProducerRecord<>("deposits",
                "{\"account\": \"" + toAccount + \
                "\", \"amount\": " + amount + "}"));
            
            producer.commitTransaction();
            System.out.println("Перевод успешен");
            
        } catch (Exception e) {
            producer.abortTransaction();
            System.err.println("Перевод отменён");
        } finally {
            producer.close();
        }
    }
}

// Гарантия: либо оба события (вывод + зачисление), 
// либо ничего. Никогда нет ситуации когда деньги вышли,
// но не пришли.

Заключение

Exactly-Once семантика в Kafka (через Idempotence и Transactional API) гарантирует, что сообщения будут доставлены и обработаны ровно один раз. Это критично для финансовых систем, заказов и других бизнес-операций где дубликаты недопустимы. Используй ENABLE_IDEMPOTENCE_CONFIG и TRANSACTIONAL_ID_CONFIG для обеспечения этой гарантии.