Сколько сообщений хранит Kafka?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Сколько сообщений хранит Kafka
Это вопрос об архитектуре Kafka и её характеристиках хранения. Рассмотрю подробно.
1. Основной ответ
Kafka не имеет жёсткого лимита на количество сообщений. Сколько будет храниться — определяется конфигурацией.
2. Параметры, влияющие на хранение
log.retention.hours (основной параметр)
По умолчанию Kafka хранит сообщения 168 часов (7 дней).
# server.properties
log.retention.hours=168 # По умолчанию
Это можно изменить:
log.retention.hours=24 # 1 день
log.retention.hours=720 # 30 дней
log.retention.hours=8760 # 1 год
log.retention.bytes (лимит по размеру)
Можно ограничить размер партиции:
log.retention.bytes=1073741824 # 1 GB на партицию
Когда партиция превышает этот размер, старые сообщения удаляются независимо от времени.
log.segment.bytes (размер сегмента)
log.segment.bytes=1073741824 # 1 GB (по умолчанию)
Kafka разбивает данные на сегменты. Когда сегмент достигает этого размера, создаётся новый.
3. Пример расчёта
Предположим:
- Topic имеет 3 partition-а
- Каждая partition получает 1000 сообщений в секунду
- Каждое сообщение = 1 KB
1000 сообщений/сек × 1 KB = 1 MB/сек
1 MB/сек × 60 сек × 60 мин × 24 часа × 7 дней = 604.8 GB
При log.retention.bytes=1073741824 (1 GB):
Каждая partition хранит максимум 1 GB
Всего 3 partitions × 1 GB = 3 GB для этого topic
4. Компромис между временем и размером
# Вариант 1: По времени (больше гибкости)
log.retention.hours=168 # 7 дней
# log.retention.bytes=-1 (отключено)
# Вариант 2: По размеру (экономия дискового пространства)
# log.retention.hours=-1 (отключено)
log.retention.bytes=1073741824 # 1 GB на partition
# Вариант 3: Обоих ограничений (применяется ПЕРВОЕ)
log.retention.hours=168
log.retention.bytes=1073741824
# Удаляется, когда выполнится ИЛИ 168 часов, ИЛИ размер превышен
5. Практический пример в коде
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
// Отправляем сообщение
kafkaTemplate.send(topic, message).addCallback(
result -> System.out.println("Отправлено, offset: "
+ result.getRecordMetadata().offset()),
ex -> System.err.println("Ошибка: " + ex.getMessage())
);
}
}
@Service
public class KafkaConsumerService {
// Kafka гарантирует, что сообщение будет доступно для чтения
// столько, сколько настроено в log.retention.hours
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("Получено: " + message);
}
}
6. Что произойдёт, если consumer отстал?
Timeline:
├── День 1: Consumer1 подписался, начал читать
├── День 3: Consumer1 упал на 2 дня
├── День 5: Consumer1 восстановился, пытается читать offset'ы со дня 3
│ ✓ Если log.retention.hours=168 (7 дней), данные есть!
│ ✓ Consumer может "наверстать" упущенное
│
└── День 9: Consumer2 подписался, пытается читать старые данные
✗ Если log.retention.hours=168, данные удалены!
✗ Consumer начнёт со следующего доступного offset'а
7. Крупномасштабные примеры
LinkedIn (создатель Kafka)
- Триллионы сообщений в день
- 11+ PB хранилища
- Используют log.retention.days в зависимости от importance topic'а
Типичные настройки в production
# High-volume topics (логи, метрики)
log.retention.hours=24 # 1 день
log.retention.bytes=10737418240 # 10 GB
# Critical topics (транзакции, события)
log.retention.hours=720 # 30 дней
log.retention.bytes=-1 # Неограниченно
# Compacted topics (последнее значение по ключу)
log.cleanup.policy=compact
log.retention.bytes=-1
8. Компактные topic'и (Compacted)
log.cleanup.policy=compact # Вместо delete
Kafka хранит только последнее значение для каждого ключа:
Исходный лог:
key=user1, value=status_pending
key=user1, value=status_processing
key=user1, value=status_completed
key=user2, value=status_pending
После компактизации:
key=user1, value=status_completed (старые удалены)
key=user2, value=status_pending
Используется для состояния (status, config, и т.п.).
9. Важные моменты
- По умолчанию 7 дней — но это не гарантия!
- Размер диска — главное ограничение в production
- Replication factor — каждое сообщение хранится на N брокерах
- Партиции — разные partition'ы могут иметь разные настройки
- Offset commit — consumer отслеживает, где он был, восстанавливается при перезапуске
10. Рекомендации
// Для разных сценариев выбирайте:
// 1. Недолговременные события (логи)
@KafkaListener(topics = "app-logs")
public void consumeLogs(String message) {
// Хранится 24 часа, потом удаляется
// Consumer должен обрабатывать в реальном времени
}
// 2. Критические события (платежи)
@KafkaListener(topics = "payments")
public void consumePayments(String message) {
// Хранится 30 дней
// Есть время для восстановления consumer'а при сбое
}
// 3. Состояние (последнее значение)
@KafkaListener(topics = "user-state")
public void consumeState(String message) {
// Compacted log - хранится последнее значение
// Новый consumer получит текущее состояние при подписке
}
Вывод: Kafka хранит ровно столько сообщений, сколько позволяет дисковое пространство и настройка log.retention.hours/bytes. Это гибкая и мощная система, которая может хранить сообщения от нескольких часов до лет.