← Назад к вопросам

Сколько сообщений хранит Kafka?

2.3 Middle🔥 121 комментариев
#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Сколько сообщений хранит Kafka

Это вопрос об архитектуре Kafka и её характеристиках хранения. Рассмотрю подробно.

1. Основной ответ

Kafka не имеет жёсткого лимита на количество сообщений. Сколько будет храниться — определяется конфигурацией.

2. Параметры, влияющие на хранение

log.retention.hours (основной параметр)

По умолчанию Kafka хранит сообщения 168 часов (7 дней).

# server.properties
log.retention.hours=168  # По умолчанию

Это можно изменить:

log.retention.hours=24    # 1 день
log.retention.hours=720   # 30 дней
log.retention.hours=8760  # 1 год

log.retention.bytes (лимит по размеру)

Можно ограничить размер партиции:

log.retention.bytes=1073741824  # 1 GB на партицию

Когда партиция превышает этот размер, старые сообщения удаляются независимо от времени.

log.segment.bytes (размер сегмента)

log.segment.bytes=1073741824  # 1 GB (по умолчанию)

Kafka разбивает данные на сегменты. Когда сегмент достигает этого размера, создаётся новый.

3. Пример расчёта

Предположим:

  • Topic имеет 3 partition-а
  • Каждая partition получает 1000 сообщений в секунду
  • Каждое сообщение = 1 KB
1000 сообщений/сек × 1 KB = 1 MB/сек
1 MB/сек × 60 сек × 60 мин × 24 часа × 7 дней = 604.8 GB

При log.retention.bytes=1073741824 (1 GB):
Каждая partition хранит максимум 1 GB
Всего 3 partitions × 1 GB = 3 GB для этого topic

4. Компромис между временем и размером

# Вариант 1: По времени (больше гибкости)
log.retention.hours=168    # 7 дней
# log.retention.bytes=-1   (отключено)

# Вариант 2: По размеру (экономия дискового пространства)
# log.retention.hours=-1   (отключено)
log.retention.bytes=1073741824  # 1 GB на partition

# Вариант 3: Обоих ограничений (применяется ПЕРВОЕ)
log.retention.hours=168
log.retention.bytes=1073741824
# Удаляется, когда выполнится ИЛИ 168 часов, ИЛИ размер превышен

5. Практический пример в коде

@Service
public class KafkaProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String message) {
        // Отправляем сообщение
        kafkaTemplate.send(topic, message).addCallback(
            result -> System.out.println("Отправлено, offset: " 
                + result.getRecordMetadata().offset()),
            ex -> System.err.println("Ошибка: " + ex.getMessage())
        );
    }
}

@Service
public class KafkaConsumerService {
    // Kafka гарантирует, что сообщение будет доступно для чтения
    // столько, сколько настроено в log.retention.hours
    
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consume(String message) {
        System.out.println("Получено: " + message);
    }
}

6. Что произойдёт, если consumer отстал?

Timeline:
├── День 1: Consumer1 подписался, начал читать
├── День 3: Consumer1 упал на 2 дня
├── День 5: Consumer1 восстановился, пытается читать offset'ы со дня 3
│           ✓ Если log.retention.hours=168 (7 дней), данные есть!
│           ✓ Consumer может "наверстать" упущенное
│
└── День 9: Consumer2 подписался, пытается читать старые данные
            ✗ Если log.retention.hours=168, данные удалены!
            ✗ Consumer начнёт со следующего доступного offset'а

7. Крупномасштабные примеры

LinkedIn (создатель Kafka)

- Триллионы сообщений в день
- 11+ PB хранилища
- Используют log.retention.days в зависимости от importance topic'а

Типичные настройки в production

# High-volume topics (логи, метрики)
log.retention.hours=24    # 1 день
log.retention.bytes=10737418240  # 10 GB

# Critical topics (транзакции, события)
log.retention.hours=720   # 30 дней
log.retention.bytes=-1    # Неограниченно

# Compacted topics (последнее значение по ключу)
log.cleanup.policy=compact
log.retention.bytes=-1

8. Компактные topic'и (Compacted)

log.cleanup.policy=compact  # Вместо delete

Kafka хранит только последнее значение для каждого ключа:

Исходный лог:
key=user1, value=status_pending
key=user1, value=status_processing
key=user1, value=status_completed
key=user2, value=status_pending

После компактизации:
key=user1, value=status_completed  (старые удалены)
key=user2, value=status_pending

Используется для состояния (status, config, и т.п.).

9. Важные моменты

  1. По умолчанию 7 дней — но это не гарантия!
  2. Размер диска — главное ограничение в production
  3. Replication factor — каждое сообщение хранится на N брокерах
  4. Партиции — разные partition'ы могут иметь разные настройки
  5. Offset commit — consumer отслеживает, где он был, восстанавливается при перезапуске

10. Рекомендации

// Для разных сценариев выбирайте:

// 1. Недолговременные события (логи)
@KafkaListener(topics = "app-logs")
public void consumeLogs(String message) {
    // Хранится 24 часа, потом удаляется
    // Consumer должен обрабатывать в реальном времени
}

// 2. Критические события (платежи)
@KafkaListener(topics = "payments")
public void consumePayments(String message) {
    // Хранится 30 дней
    // Есть время для восстановления consumer'а при сбое
}

// 3. Состояние (последнее значение)
@KafkaListener(topics = "user-state")
public void consumeState(String message) {
    // Compacted log - хранится последнее значение
    // Новый consumer получит текущее состояние при подписке
}

Вывод: Kafka хранит ровно столько сообщений, сколько позволяет дисковое пространство и настройка log.retention.hours/bytes. Это гибкая и мощная система, которая может хранить сообщения от нескольких часов до лет.