Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Ответ
Как очищаются сообщения из Kafka
Кафка — распределённый лог сообщений. Очистка данных происходит через несколько механизмов, зависящих от конфигурации и политики хранения.
1. Удаление по времени (Time-based retention)
Кафка автоматически удаляет старые сообщения на основе времени хранения.
Конфигурация в broker (server.properties):
# Хранить сообщения 7 дней по умолчанию
log.retention.hours=168
# Или в миллисекундах
log.retention.ms=604800000
Конфигурация по топику:
# При создании топика
kafka-topics --create --topic my-topic --retention-ms 86400000
# Изменить существующий топик
kafka-configs --alter --entity-type topic --entity-name my-topic --add-config retention.ms=86400000
2. Удаление по размеру (Size-based retention)
Удаляет сообщения когда топик достигает определённого размера.
# Максимальный размер всех логов
log.retention.bytes=1073741824 # 1 GB
# Размер для конкретного топика
topic.retention.bytes=536870912 # 500 MB
3. Cleanup Policy — стратегия очистки
Управляет тем, как Kafka удаляет старые сообщения.
Delete (по умолчанию) — удаляет старые сообщения:
kafka-topics --create --topic events --cleanup-policy delete --retention-hours 24
Compact — сохраняет последнее значение для каждого ключа:
kafka-topics --create --topic user-state --cleanup-policy compact
Примеры разницы:
Cleanup=delete (события):
offset: 0 -> user:1 (удалится через 24 часа)
offset: 1 -> user:1 (удалится через 24 часа)
offset: 2 -> user:2 (удалится через 24 часа)
offset: 3 -> user:1 (удалится через 24 часа)
Cleanup=compact (состояние):
offset: 0 -> user:1 (удалится, есть новое значение)
offset: 1 -> user:1 (удалится, есть новое значение)
offset: 2 -> user:2 (остаётся)
offset: 3 -> user:1 (остаётся, последнее значение)
4. Java конфигурация с Spring Boot
@Configuration
public class KafkaTopicsConfig {
// Обычный event топик — удаляется через 7 дней
@Bean
public NewTopic eventsTopic() {
return TopicBuilder.name("events")
.partitions(3)
.replicas(2)
.config("retention.ms", String.valueOf(7 * 24 * 60 * 60 * 1000))
.config("cleanup.policy", "delete")
.build();
}
// Компактированный топик для состояния
@Bean
public NewTopic userStateTopic() {
return TopicBuilder.name("user-state")
.partitions(1)
.replicas(2)
.config("cleanup.policy", "compact")
.config("min.compaction.lag.ms", String.valueOf(60 * 60 * 1000))
.build();
}
}
5. Log Segment Deletion — удаление сегментов
Кафка использует сегменты (файлы на диске). Очистка происходит по сегментам.
# Размер одного сегмента
log.segment.bytes=1073741824 # 1 GB
# Интервал проверки очистки
log.retention.check.interval.ms=300000 # 5 минут
Когда сегмент полностью старый — он удаляется целиком.
6. Tombstone Records — логическое удаление
Для компактированных логов используются null значения для удаления.
@Service
public class UserService {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
// Логическое удаление пользователя
public void deleteUser(Long userId) {
// Отправляем null для этого ключа
kafkaTemplate.send("user-state", userId.toString(), null);
}
}
7. Manual Deletion — ручное удаление
# Удалить все данные в топике
kafka-topics --delete --topic my-topic
# Создать заново
kafka-topics --create --topic my-topic --partitions 3 --replication-factor 2
# Сбросить offset группы
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute
8. Consumer Group Offset Management
Оффсеты хранятся отдельно и не удаляются автоматически с сообщениями.
# Просмотреть offsets
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe
# Сбросить offsets
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute
9. Сравнение политик хранения
| Параметр | Delete | Compact | Комбо |
|---|---|---|---|
| Использование | События, логи | Состояние | Beide |
| Хранит | Все события | Последнее значение ключа | Оба |
| Размер диска | Часто больше | Часто меньше | Оба |
| Пример | order-events | user-state | audit-logs |
10. Best Practices
-
Выбирай политику осознанно
- Events/logs → cleanup.policy=delete
- State/config → cleanup.policy=compact
-
Настрой retention по бизнес-требованиям
- Аналитика: 30-90 дней
- Event sourcing: 1-2 года
- Состояние: compact с tombstone
-
Мониторь очистку логов
- Проверяй metric DeletedLogTotalSize
- Смотри свободное место на диске
-
Тестируй удаление перед production
- Убедись retention не удаляет нужные данные
- Проверь consumer lag
Краткая схема: Kafka удаляет сообщения либо по времени (retention.ms), либо по размеру (retention.bytes), либо через log compaction. Всё настраивается на уровне broker или топика.