← Назад к вопросам

Как восстановить сломанный источник при работе с Kafka

2.0 Middle🔥 171 комментариев
#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Как восстановить сломанный источник при работе с Kafka

Контекст и проблема

При работе с Kafka могут возникать ситуации, когда producer (источник) перестаёт отправлять сообщения, падает соединение, или накапливаются необработанные данные. Восстановление зависит от типа проблемы и архитектуры системы.

1. Диагностика проблемы

Проверка состояния broker'ов

kafka-broker-api-versions.sh --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
kafka-topics.sh --describe --under-replicated-partitions --bootstrap-server localhost:9092

2. Восстановление producer'а

Проверка и перезапуск Java приложения

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerRecovery {
    private KafkaProducer<String, String> producer;
    private boolean isHealthy = true;
    
    public void initProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                  "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  "org.apache.kafka.common.serialization.StringSerializer");
        
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        
        try {
            producer = new KafkaProducer<>(props);
            isHealthy = true;
        } catch (Exception e) {
            isHealthy = false;
        }
    }
    
    public void sendWithRetry(String topic, String key, String value) {
        int maxRetries = 5;
        int attempt = 0;
        
        while (attempt < maxRetries) {
            try {
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>(topic, key, value);
                producer.send(record);
                break;
            } catch (Exception e) {
                attempt++;
                if (attempt < maxRetries) {
                    try {
                        Thread.sleep(1000 * attempt);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
}

3. Consumer с manual offset management

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;

public class KafkaConsumerRecovery {
    private KafkaConsumer<String, String> consumer;
    
    public void initConsumer(String topic, String groupId) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(java.util.Collections.singletonList(topic));
    }
    
    public void consumeWithRecovery() {
        try {
            while (true) {
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofSeconds(1));
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        System.out.println("Processed: " + record.value());
                        consumer.commitSync();
                    } catch (Exception e) {
                        consumer.seek(new TopicPartition(record.topic(), record.partition()),
                                    record.offset());
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. Ключевые стратегии восстановления

Idempotent Producer

  • Одно и то же сообщение не дублируется даже при retry
  • enable.idempotence=true + acks=all

Graceful Shutdown

public void shutdown() {
    producer.flush();
    producer.close();
}

Health Checks

  • Периодически проверять соединение
  • Мониторить lag и offset
  • Использовать AdminClient для проверки статуса broker'ов

5. Восстановление данных

  • Seek to earliest — переобработка с начала
  • Seek to timestamp — восстановление с определённого момента
  • Rebalancing — автоматическое переподключение consumer group

Заключение

Восстановление при Kafka требует:

  1. Правильной конфигурации (acks, retries, idempotence)
  2. Manual offset management для контроля
  3. Health checks и мониторинга
  4. Graceful восстановления с exponential backoff
  5. Возможности replay данных из истории
Как восстановить сломанный источник при работе с Kafka | PrepBro