← Назад к вопросам

Какие настройки Kafka применял

1.8 Middle🔥 61 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Настройки Kafka, которые я применял

Kafka — это распределённая система обработки потоков данных, которая требует тщательной настройки для оптимальной производительности и надёжности. За годы работы я применил множество конфигураций для разных сценариев.

1. Настройки Producer'а

Основные параметры Producer'а настраивают передачу сообщений в Kafka:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Гарантии доставки
props.put("acks", "all");  // Или 0, 1, all
props.put("retries", 3);
props.put("max.in.flight.requests.per.connection", 1);

// Производительность
props.put("batch.size", 16384);      // 16KB
props.put("linger.ms", 10);          // Ждать 10мс перед отправкой
props.put("buffer.memory", 33554432); // 32MB

// Таймауты
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Объяснение ключевых настроек:

  • acks="all" — Producer ждёт подтверждения от всех in-sync replicas перед возвратом. Максимальная надёжность, но медленнее.
  • acks="1" — Ждёт только от leader broker'а. Средний баланс между скоростью и надёжностью.
  • acks="0" — Не ждёт подтверждения вообще. Максимальная производительность, но может быть потеря сообщений.
  • retries — Число попыток отправки при ошибке
  • batch.size — Размер батча перед отправкой
  • linger.ms — Время ожидания перед отправкой батча

2. Настройки Consumer'а

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Обработка смещений (offsets)
props.put("auto.offset.reset", "earliest");  // Или latest, none
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 1000);

// Производительность
props.put("fetch.min.bytes", 1);      // Минимум bytes для fetch
props.put("fetch.max.wait.ms", 500);
props.put("max.poll.records", 500);   // Макс. записей за раз
props.put("session.timeout.ms", 10000);
props.put("heartbeat.interval.ms", 3000);

// Параллелизм
props.put("max.poll.interval.ms", 300000);  // 5 минут

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Ключевые настройки Consumer'а:

  • auto.offset.reset="earliest" — читать с начала если нет сохранённого offset
  • auto.offset.reset="latest" — читать только новые сообщения
  • enable.auto.commit=true — автоматически коммитить offsets (опасно для at-least-once)
  • enable.auto.commit=false — ручное управление offsets для большей контроля
  • max.poll.records — число записей за один poll(), важно для балансировки
  • session.timeout.ms — если consumer не отправит heartbeat, будет считаться мёртвым

3. Практический пример: Надёжный Consumer

public class ReliableConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "reliable-group");
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        
        // Для at-least-once обработки
        props.put("enable.auto.commit", false);  // Ручной commit
        props.put("auto.offset.reset", "earliest");
        props.put("isolation.level", "read_committed");  // Только committed сообщения
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                try {
                    // Обработка сообщения
                    processMessage(record.value());
                } catch (Exception e) {
                    log.error("Error processing: " + record.value(), e);
                    // Не коммитим offset, сообщение будет переобработано
                    break;
                }
            }
            
            // Коммитим только успешно обработанные
            consumer.commitSync();
        }
    }
}

4. Настройки Broker'а (server.properties)

# Идентификация
broker.id=0
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092

# Логирование и хранилище
log.dirs=/var/kafka-logs
num.network.threads=8
num.io.threads=8

# Репликация
default.replication.factor=3
min.insync.replicas=2  # Минимум в sync для acks=all

# Сохранение данных
log.retention.hours=168  # 7 дней
log.retention.bytes=-1   # Неограниченно
log.segment.bytes=1073741824  # 1GB

# Производительность
num.replica.fetchers=4
replica.lag.time.max.ms=30000

# Компресс сообщений
compression.type=snappy

# Очистка логов
log.cleanup.policy=delete
log.cleanup.delete.retention.ms=86400000

5. Topic-level конфигурация

# Создание topic с настройками
kafka-topics --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config compression.type=snappy \
  --config min.insync.replicas=2

# Изменение конфигурации
kafka-configs --alter \
  --entity-type topics \
  --entity-name my-topic \
  --add-config retention.bytes=1073741824
// Программно в Java
NewTopic topic = new NewTopic(
    "my-topic",
    3,  // partitions
    (short) 3  // replication factor
).configs(Collections.singletonMap("compression.type", "snappy"));

AdminClient admin = AdminClient.create(adminProperties);
admin.createTopics(Arrays.asList(topic));

6. Практический пример: Высокопроизводительный Producer

public class HighThroughputProducer {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        
        // Для максимальной производительности
        props.put("acks", "1");  // Не ждём full replication
        props.put("batch.size", 32768);      // 32KB батч
        props.put("linger.ms", 100);         // Ждём 100мс
        props.put("buffer.memory", 67108864); // 64MB буфер
        props.put("compression.type", "snappy");
        props.put("max.in.flight.requests.per.connection", 5);
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Асинхронная отправка с callback
        for (int i = 0; i < 1000000; i++) {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
            
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    // Успешно отправлено
                } else {
                    log.error("Failed to send message", exception);
                }
            });
        }
        
        producer.flush();  // Убедимся, что все отправлены
        producer.close();
    }
}

7. Spring Boot Integration

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
      batch-size: 16384
      linger-ms: 10
    consumer:
      group-id: my-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 500
@Component
public class KafkaListeners {
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record,
                       Acknowledgment acknowledgment) {
        try {
            log.info("Received: " + record.value());
            // Обработка
            processMessage(record.value());
            acknowledgment.acknowledge();  // Ручной commit
        } catch (Exception e) {
            log.error("Error processing message", e);
            // Не коммитим, сообщение будет переобработано
        }
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(
            ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

8. Мониторинг и метрики

# Включаем JMX метрики
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
  -Dcom.sun.management.jmxremote.authenticate=false
  -Dcom.sun.management.jmxremote.ssl=false
  -Djava.rmi.server.hostname=localhost
  -Dcom.sun.management.jmxremote.rmi.port=9999"

Важные метрики для мониторинга:

  • kafka.producer:type=producer-metrics — задержки, rate отправки
  • kafka.consumer:type=consumer-fetch-manager-metrics — lag потребителя
  • kafka.server:type=BrokerTopicMetrics — throughput

9. Настройки для разных сценариев

Сценарий 1: Критичная надёжность (банки, платежи)

props.put("acks", "all");
props.put("min.insync.replicas", 2);
props.put("retries", 10);
props.put("enable.idempotence", true);  // Избегаем дупликатов
props.put("max.in.flight.requests.per.connection", 1);

Сценарий 2: Максимальная производительность (логи, аналитика)

props.put("acks", "0");  // Не ждём ничего
props.put("batch.size", 65536);  // 64KB
props.put("linger.ms", 1000);    // 1 сек
props.put("compression.type", "gzip");

Сценарий 3: Баланс (большинство приложений)

props.put("acks", "1");
props.put("batch.size", 16384);
props.put("linger.ms", 10);
props.put("compression.type", "snappy");

10. Настройки для обработки больших объёмов

// Producer для миллионов сообщений в день
props.put("buffer.memory", 268435456);  // 256MB
props.put("batch.size", 65536);         // 64KB
props.put("linger.ms", 50);
props.put("compression.type", "snappy");
props.put("num.network.threads", 16);
props.put("num.io.threads", 16);

// Consumer для параллельной обработки
props.put("fetch.min.bytes", 10485760);  // 10MB
props.put("fetch.max.wait.ms", 500);
props.put("max.poll.records", 1000);

Таблица часто используемых настроек

ПараметрЗначениеОписание
acksall, 1, 0Уровень подтверждения
retries3, 10Количество попыток
batch.size16384Размер батча в bytes
linger.ms10, 100Время ожидания перед отправкой
compression.typesnappy, gzip, lz4Сжатие сообщений
buffer.memory33554432Буфер producer'а в bytes
max.poll.records500Сообщений за poll()
session.timeout.ms10000Timeout для consumer heartbeat

Заключение

Правильная настройка Kafka — ключ к надёжной и производительной обработке потоков данных. Выбор параметров зависит от требований:

  • Надёжность требует acks=all и min.insync.replicas
  • Производительность требует больших batches и компрессии
  • Баланс достигается с acks=1 и средними батчами

Всегда мониторьте метрики и адаптируйте конфигурацию под реальное поведение системы.

Какие настройки Kafka применял | PrepBro