Опиши технологию инвалидации кэша
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Инвалидация кэша: основные технологии и подходы
Инвалидация кэша — это процесс удаления или обновления устаревших данных в кэше для обеспечения консистентности с источником истины. Это критически важная задача в высоконагруженных системах.
Основные стратегии инвалидации
1. TTL (Time-To-Live)
Самый простой подход — установка времени жизни для каждого ключа кэша:
import redis
from datetime import timedelta
client = redis.Redis(host="localhost", port=6379)
# Кэшируем данные на 1 час
client.setex(
name="user:123:profile",
time=timedelta(hours=1),
value=json.dumps(user_data)
)
# Или через expire
client.set("user:123:profile", json.dumps(user_data))
client.expire("user:123:profile", 3600)
Плюсы: простота, автоматическая очистка. Минусы: недостаточна для часто меняющихся данных, может привести к устаревшим данным.
2. Event-driven инвалидация
Удаляем кэш при возникновении события (обновление, удаление):
from functools import wraps
import redis
client = redis.Redis(host="localhost", port=6379)
def invalidate_cache(*keys):
"""Декоратор для инвалидации кэша"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# Удаляем ключи из кэша после обновления
for key in keys:
client.delete(key)
return result
return wrapper
return decorator
@invalidate_cache("products:list", "products:featured")
def update_product(product_id, data):
# Обновляем продукт в БД
db.products.update({"_id": product_id}, {"$set": data})
return True
Плюсы: актуальные данные, немедленная реакция. Минусы: требует обработки всех операций изменения, может быть затратно.
3. Версионирование ключей
Используем версию в названии ключа:
class VersionedCache:
def __init__(self, redis_client):
self.client = redis_client
self.versions = {} # версии для типов данных
def get_key(self, entity_type: str, entity_id: str) -> str:
version = self.versions.get(entity_type, 0)
return f"{entity_type}:v{version}:{entity_id}"
def get(self, entity_type: str, entity_id: str):
key = self.get_key(entity_type, entity_id)
return self.client.get(key)
def set(self, entity_type: str, entity_id: str, value: str):
key = self.get_key(entity_type, entity_id)
self.client.set(key, value)
def invalidate_type(self, entity_type: str):
"""Инвалидируем все данные типа"""
self.versions[entity_type] = self.versions.get(entity_type, 0) + 1
# Использование
cache = VersionedCache(client)
cache.set("user", "123", user_data)
# Инвалидируем всех пользователей
cache.invalidate_type("user")
Плюсы: эффективно для массовой инвалидации, не требует удаления ключей. Минусы: накопление старых ключей в памяти.
4. Кэш с зависимостями
Сохраняем информацию о зависимостях между ключами:
class DependentCache:
def __init__(self, redis_client):
self.client = redis_client
self.dependencies = {} # ключ -> список зависимых ключей
def set(self, key: str, value, depends_on: list = None):
self.client.set(key, value)
if depends_on:
# Сохраняем зависимости
for dep_key in depends_on:
if dep_key not in self.dependencies:
self.dependencies[dep_key] = []
if key not in self.dependencies[dep_key]:
self.dependencies[dep_key].append(key)
def invalidate(self, key: str):
"""Инвалидируем ключ и все зависимые"""
to_delete = [key]
queue = [key]
while queue:
current = queue.pop(0)
dependent = self.dependencies.get(current, [])
to_delete.extend(dependent)
queue.extend(dependent)
# Удаляем все найденные ключи
if to_delete:
self.client.delete(*to_delete)
# Использование
cache = DependentCache(client)
# Кэшируем профиль и рейтинг (зависит от профиля)
cache.set("user:123:profile", profile_data)
cache.set("user:123:rating", rating_data, depends_on=["user:123:profile"])
# Инвалидируем профиль - удалится и рейтинг
cache.invalidate("user:123:profile")
Плюсы: точное управление зависимостями, минимизирует устаревшие данные. Минусы: дополнительная сложность.
5. Cache-aside с предварительной загрузкой
Обновляем кэш в фоне перед истечением TTL:
import threading
import time
class ProbabilisticEarlyExpiration:
def __init__(self, redis_client):
self.client = redis_client
def get_with_refresh(self, key: str, fetch_func, ttl: int, beta=1.0):
"""Cache-aside с XFetch алгоритмом"""
value = self.client.get(key)
if value is None:
# Кэша нет - выполняем основной запрос
value = fetch_func()
self.client.setex(key, ttl, value)
return value
# Есть в кэше - проверяем нужно ли обновлять
ttl_left = self.client.ttl(key)
delta = (ttl - ttl_left) * beta
if delta > random.random() * ttl:
# Запускаем обновление в фоне
thread = threading.Thread(
target=lambda: self.client.setex(key, ttl, fetch_func())
)
thread.daemon = True
thread.start()
return value
# Использование
cache_client = ProbabilisticEarlyExpiration(client)
user_data = cache_client.get_with_refresh(
key="user:123:profile",
fetch_func=lambda: db.users.find_one({"_id": "123"}),
ttl=3600
)
Плюсы: не блокирует основной процесс, актуальные данные. Минусы: требует осторожности с одновременными обновлениями.
Практические рекомендации
- Комбинируй подходы: TTL + Event-driven инвалидация
- Используй Redis Streams для отслеживания событий
- Мониторь размер кэша и удаляй неиспользуемые данные
- Документируй время жизни для каждого ключа
- Тестируй сценарии потери данных из кэша
Выбор стратегии зависит от критичности данных и частоты обновлений.