← Назад к вопросам

Что происходит с необработанными сообщениями в Kafka?

2.0 Middle🔥 121 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
deepseek-v3.2PrepBro AI7 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что происходит с необработанными сообщениями в Kafka?

В Apache Kafka понятие "необработанных сообщений" в строгом смысле не существует — сообщения просто хранятся в топиках (topics) на серверах Kafka, пока их не прочитают консьюмеры (consumers). Однако на практике "необработанными" часто называют сообщения, которые либо ещё не были прочитаны, либо не были корректно обработаны консьюмером. Давайте разберём ключевые механизмы, связанные с этой темой.

Хранение сообщений в Kafka: retention policies

Kafka — это распределённая система хранения на основе журнала событий (log), где сообщения сохраняются в топиках, разделённых на партиции (partitions). Удаление сообщений зависит не от факта их обработки, а от настроек политик хранения (retention policies):

# Пример конфигурации топика (server.properties или при создании)
log.retention.hours=168      # хранить 7 дней
log.retention.bytes=1073741824  # или до достижения 1 GB на партицию
log.segment.bytes=104857600  # размер сегмента лога
log.cleanup.policy=delete    # политика удаления (по умолчанию)

Если используется политика compact (очистка по ключу), будут сохраняться только последние значения для каждого ключа, но это не связано с "обработкой".

Основные сценарии для "необработанных" сообщений:

  1. Сообщения, которые ещё не прочитаны
    Они просто остаются в логе, пока не истечёт срок хранения. Если консьюмер запустится позже (в пределах retention period), он сможет их прочитать с начала топика (или с указанного смещения).

  2. Сообращения, которые консьюмер прочитал, но не подтвердил (не закоммитил смещение) Это ключевой момент: Kafka использует механизм offsets (смещений) для отслеживания прогресса консьюмера. Консьюмер может прочитать сообщение, но не обработать его из-за сбоя. Kafka при этом не удаляет сообщение! Оно остаётся в логе, и другой консьюмер (или тот же после перезапуска) может прочитать его снова. Всё зависит от того, как управляются смещения.

Управление смещениями (offsets) и гарантии доставки

Consumer groups управляют смещениями через внутренний топик __consumer_offsets. Есть два основных подхода:

// Пример на Go (sarama library): ручное управление смещениями
consumer, err := sarama.NewConsumerFromClient(client)
partitionConsumer, err := consumer.ConsumePartition("my-topic", 0, sarama.OffsetOldest)

for msg := range partitionConsumer.Messages() {
    processMessage(msg) // Обработка сообщения
    // Если processMessage завершится с ошибкой, смещение НЕ будет закоммичено
    // Сообщение останется для повторной обработки
}

// Автоматический коммит (по умолчанию) - риск потерять сообщения при сбое
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

// Ручной коммит для гарантий "exactly-once" семантики
session.MarkMessage(msg, "")
session.Commit()
  • Автоматический коммит (enable.auto.commit=true):
    Смещение периодически сохраняется. Если консьюмер упадёт после обработки, но до коммита, сообщение будет прочитано повторно (дублирование). Если упадёт до обработки — сообщение считается обработанным (потеря данных).

  • Ручной коммит:
    Позволяет реализовать at-least-once (минимум один раз) или exactly-once семантику. Сообщение останется "необработанным" до явного коммита смещения.

Стратегии обработки сбоев и "зависших" сообщений

  1. Повторные попытки (retries) и dead letter queues (DLQ)
    Если консьюмер не может обработать сообщение (например, из-за некорректных данных), лучшая практика — отправить его в топик-кладбище (dead letter topic) для дальнейшего анализа, а смещение закоммитить. Иначе консьюмер будет "зависать" на одном сообщении.
func consumeWithDLQ(consumer sarama.Consumer, dlqProducer sarama.SyncProducer) {
    for msg := range consumer.Messages() {
        err := process(msg)
        if err != nil {
            // Отправляем в DLQ, коммитим смещение основного топика
            dlqProducer.SendMessage(&sarama.ProducerMessage{
                Topic: "dead-letter-queue",
                Value: sarama.StringEncoder(msg.Value),
                Headers: []sarama.RecordHeader{
                    {Key: []byte("error"), Value: []byte(err.Error())},
                },
            })
        }
        consumer.MarkOffset(msg, "")
    }
}
  1. Мониторинг лага консьюмера (consumer lag)
    Lag — разница между последним сообщением в партиции и текущим смещением консьюмера. Высокий lag означает накопление "необработанных" сообщений. Инструменты вроде Burrow или Kafka Lag Exporter помогают отслеживать эту метрику.

  2. Таймауты и heartbeat'ы
    Консьюмер в рамках группы должен отправлять heartbeat'ы брокеру. Если он не отвечает дольше session.timeout.ms, брокер считает его "мёртвым" и инициирует rebalance — перераспределение партиций между оставшимися консьюмерами. Неподтверждённые сообщения переходят новому консьюмеру.

Практические рекомендации

  • Всегда настраивайте retention period согласно требованиям бизнеса — если сообщения могут понадобиться через месяц, установите log.retention.hours=720.
  • Используйте ручной коммит смещений в production для контроля над обработкой.
  • Реализуйте идемпотентность в консьюмерах: повторная обработка одного сообщения не должна вызывать проблемы.
  • Настройте мониторинг consumer lag и алерты при его росте.
  • Для некорректных сообщений обязательно используйте dead letter queues, чтобы не блокировать всю обработку.

Таким образом, "необработанные" сообщения в Kafka физически никуда не пропадают автоматически — они хранятся в соответствии с настройками топика, а их судьба зависит от логики работы ваших консьюмеров и управления смещениями. Проблемы возникают не из-за Kafka, а из-за неправильной архитектуры потребителей. Правильная настройка гарантий доставки и обработки ошибок — ключ к надёжной системе.