← Назад к вопросам

Как очищаются сообщения из Kafka

1.0 Junior🔥 81 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ответ

Как очищаются сообщения из Kafka

Кафка — распределённый лог сообщений. Очистка данных происходит через несколько механизмов, зависящих от конфигурации и политики хранения.

1. Удаление по времени (Time-based retention)

Кафка автоматически удаляет старые сообщения на основе времени хранения.

Конфигурация в broker (server.properties):

# Хранить сообщения 7 дней по умолчанию
log.retention.hours=168

# Или в миллисекундах
log.retention.ms=604800000

Конфигурация по топику:

# При создании топика
kafka-topics --create --topic my-topic --retention-ms 86400000

# Изменить существующий топик
kafka-configs --alter --entity-type topic --entity-name my-topic --add-config retention.ms=86400000

2. Удаление по размеру (Size-based retention)

Удаляет сообщения когда топик достигает определённого размера.

# Максимальный размер всех логов
log.retention.bytes=1073741824  # 1 GB

# Размер для конкретного топика
topic.retention.bytes=536870912  # 500 MB

3. Cleanup Policy — стратегия очистки

Управляет тем, как Kafka удаляет старые сообщения.

Delete (по умолчанию) — удаляет старые сообщения:

kafka-topics --create --topic events --cleanup-policy delete --retention-hours 24

Compact — сохраняет последнее значение для каждого ключа:

kafka-topics --create --topic user-state --cleanup-policy compact

Примеры разницы:

Cleanup=delete (события):
offset: 0 -> user:1 (удалится через 24 часа)
offset: 1 -> user:1 (удалится через 24 часа)
offset: 2 -> user:2 (удалится через 24 часа)
offset: 3 -> user:1 (удалится через 24 часа)

Cleanup=compact (состояние):
offset: 0 -> user:1 (удалится, есть новое значение)
offset: 1 -> user:1 (удалится, есть новое значение)
offset: 2 -> user:2 (остаётся)
offset: 3 -> user:1 (остаётся, последнее значение)

4. Java конфигурация с Spring Boot

@Configuration
public class KafkaTopicsConfig {
    
    // Обычный event топик — удаляется через 7 дней
    @Bean
    public NewTopic eventsTopic() {
        return TopicBuilder.name("events")
            .partitions(3)
            .replicas(2)
            .config("retention.ms", String.valueOf(7 * 24 * 60 * 60 * 1000))
            .config("cleanup.policy", "delete")
            .build();
    }
    
    // Компактированный топик для состояния
    @Bean
    public NewTopic userStateTopic() {
        return TopicBuilder.name("user-state")
            .partitions(1)
            .replicas(2)
            .config("cleanup.policy", "compact")
            .config("min.compaction.lag.ms", String.valueOf(60 * 60 * 1000))
            .build();
    }
}

5. Log Segment Deletion — удаление сегментов

Кафка использует сегменты (файлы на диске). Очистка происходит по сегментам.

# Размер одного сегмента
log.segment.bytes=1073741824  # 1 GB

# Интервал проверки очистки
log.retention.check.interval.ms=300000  # 5 минут

Когда сегмент полностью старый — он удаляется целиком.

6. Tombstone Records — логическое удаление

Для компактированных логов используются null значения для удаления.

@Service
public class UserService {
    
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;
    
    // Логическое удаление пользователя
    public void deleteUser(Long userId) {
        // Отправляем null для этого ключа
        kafkaTemplate.send("user-state", userId.toString(), null);
    }
}

7. Manual Deletion — ручное удаление

# Удалить все данные в топике
kafka-topics --delete --topic my-topic

# Создать заново
kafka-topics --create --topic my-topic --partitions 3 --replication-factor 2

# Сбросить offset группы
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute

8. Consumer Group Offset Management

Оффсеты хранятся отдельно и не удаляются автоматически с сообщениями.

# Просмотреть offsets
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe

# Сбросить offsets
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute

9. Сравнение политик хранения

ПараметрDeleteCompactКомбо
ИспользованиеСобытия, логиСостояниеBeide
ХранитВсе событияПоследнее значение ключаОба
Размер дискаЧасто большеЧасто меньшеОба
Примерorder-eventsuser-stateaudit-logs

10. Best Practices

  1. Выбирай политику осознанно

    • Events/logs → cleanup.policy=delete
    • State/config → cleanup.policy=compact
  2. Настрой retention по бизнес-требованиям

    • Аналитика: 30-90 дней
    • Event sourcing: 1-2 года
    • Состояние: compact с tombstone
  3. Мониторь очистку логов

    • Проверяй metric DeletedLogTotalSize
    • Смотри свободное место на диске
  4. Тестируй удаление перед production

    • Убедись retention не удаляет нужные данные
    • Проверь consumer lag

Краткая схема: Kafka удаляет сообщения либо по времени (retention.ms), либо по размеру (retention.bytes), либо через log compaction. Всё настраивается на уровне broker или топика.

Как очищаются сообщения из Kafka | PrepBro