Как реализовать кеширование?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как реализовать кеширование
Кеширование — это сохранение результатов дорогостоящих операций в быстром хранилище для повторного использования без переделывания работы. Это критический инструмент для масштабирования приложений, но требует правильной реализации.
Основные типы кеша
1. In-Memory Cache (Памяти процесса)
Идея: хранить данные в оперативной памяти приложения.
# Простой пример: встроенный Python кеш
from functools import lru_cache
from datetime import datetime, timedelta
@lru_cache(maxsize=1000) # Кеш последних 1000 результатов
def expensive_calculation(n: int) -> int:
"""Вычисляет факториал (дорого)"""
print(f"Computing factorial({n})...")
result = 1
for i in range(1, n + 1):
result *= i
return result
# Использование
print(expensive_calculation(100)) # Вычисляет (медленно)
print(expensive_calculation(100)) # Читает из кеша (быстро)
# Проблемы LRU cache:
# - Теряется при перезагрузке приложения
# - Каждый процесс имеет свой кеш
# - В микросервисной архитектуре не помогает
Пример с TTL (Time To Live):
import time
from typing import Any, Optional
class SimpleCache:
def __init__(self):
self.cache = {} # {key: (value, expiry_time)}
def get(self, key: str) -> Optional[Any]:
if key not in self.cache:
return None
value, expiry_time = self.cache[key]
if time.time() > expiry_time:
del self.cache[key] # Удаляем истёкший кеш
return None
return value
def set(self, key: str, value: Any, ttl_seconds: int = 3600):
expiry_time = time.time() + ttl_seconds
self.cache[key] = (value, expiry_time)
def delete(self, key: str):
self.cache.pop(key, None)
# Использование
cache = SimpleCache()
cache.set("user_123", {"name": "Alice", "email": "alice@example.com"}, ttl_seconds=300)
user = cache.get("user_123") # Читает из кеша
time.sleep(301)
user = cache.get("user_123") # None (истёк TTL)
2. Distributed Cache (Распределённый кеш)
Идея: хранить кеш на отдельном сервисе, доступном всем приложениям.
Redis — самый популярный выбор:
import redis
import json
from datetime import timedelta
# Подключение к Redis
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # Автоматически декодирует strings
)
# Базовые операции
def cache_user(user_id: int, user_data: dict):
"""Кеширует данные пользователя на 1 час"""
key = f"user:{user_id}"
value = json.dumps(user_data) # Сериализуем в JSON
redis_client.setex(key, timedelta(hours=1), value)
# setex = SET + EXpire (TTL)
def get_cached_user(user_id: int) -> dict:
"""Читает из кеша"""
key = f"user:{user_id}"
data = redis_client.get(key) # None если нет или истёк TTL
return json.loads(data) if data else None
def delete_user_cache(user_id: int):
"""Инвалидирует кеш при обновлении"""
key = f"user:{user_id}"
redis_client.delete(key)
# Использование
user_id = 123
user_data = {"name": "Alice", "email": "alice@example.com"}
cache_user(user_id, user_data)
user = get_cached_user(user_id) # Читает из Redis
print(user) # {"name": "Alice", "email": "alice@example.com"}
Memcached (альтернатива Redis):
import memcache
# Memcached: более простой, только key-value
mc = memcache.Client(['127.0.0.1:11211'])
mc.set('user:123', {'name': 'Alice'}, time=3600) # 1 час TTL
user = mc.get('user:123')
mc.delete('user:123')
# Различия:
# Redis: Complex data structures (lists, sets, hashes, streams)
# Memcached: Simple key-value, faster but less features
Стратегии кеширования
1. Cache-Aside (Lazy Loading)
Логика: приложение отвечает за загрузку и кеширование.
def get_user(user_id: int) -> dict:
# Шаг 1: Проверяем кеш
cached_user = redis_client.get(f"user:{user_id}")
if cached_user:
return json.loads(cached_user)
# Шаг 2: Cache miss → читаем из БД
print(f"Cache miss for user {user_id}")
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
# Шаг 3: Кешируем результат
redis_client.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
# Timeline
# t=0: get_user(123) → Cache miss → SELECT FROM DB → кешируем
# t=1: get_user(123) → Cache hit → читаем из Redis (быстро)
# t=3601: get_user(123) → Cache expired → SELECT FROM DB → кешируем
Плюсы: просто, только нужные данные кешируются Минусы: Cache stampede при одновременных cache miss'ах
2. Write-Through (Синхронное кеширование)
Логика: записываем сначала в кеш, потом в БД.
def update_user(user_id: int, new_data: dict):
# Шаг 1: Обновляем кеш
key = f"user:{user_id}"
redis_client.setex(key, 3600, json.dumps(new_data))
# Шаг 2: Обновляем БД
db.query(f"UPDATE users SET ... WHERE id = {user_id}")
# Проблема: если DB упадёт, данные потеряны!
Улучшение с транзакциями:
def update_user_safe(user_id: int, new_data: dict):
try:
# Обновляем БД (primary source of truth)
db.query(f"UPDATE users SET ... WHERE id = {user_id}")
# После успеха обновляем кеш
key = f"user:{user_id}"
redis_client.setex(key, 3600, json.dumps(new_data))
except DatabaseError:
# Если БД упала, кеш не обновляется
raise
Плюсы: гарантирует консистентность кеша и БД Минусы: медленнее (две операции)
3. Write-Behind (Write-Back, асинхронное)
Логика: пишем в кеш сразу, затем асинхронно в БД.
from celery import shared_task
def update_user_async(user_id: int, new_data: dict):
# Шаг 1: Обновляем кеш сразу (быстро)
key = f"user:{user_id}"
redis_client.setex(key, 3600, json.dumps(new_data))
# Шаг 2: Отправляем задачу в очередь для обновления БД
sync_user_to_db.delay(user_id, new_data)
return {"status": "accepted"} # Ответ почти сразу
@shared_task
def sync_user_to_db(user_id: int, new_data: dict):
"""Асинхронно пишет в БД"""
try:
db.query(f"UPDATE users SET ... WHERE id = {user_id}")
except DatabaseError:
# Retry или alert
print(f"Failed to sync user {user_id}")
# Timeline
# t=0: API получает update_user() → обновляет Redis → возвращает 200 OK
# t=0.1s: Celery worker берёт задачу
# t=1s: Celery worker обновляет БД
# Между t=0 и t=1: если система упадёт, изменение потеряется!
Плюсы: быстрый отклик пользователю Минусы: может быть потеря данных (eventual consistency)
Инвалидация кеша
1. TTL (Time To Live)
# Самый простой способ: просто дождаться истечения времени
redis_client.setex("key", 3600, "value") # 1 час TTL
# Проблема: если данные меняются чаще чем TTL, кеш может быть устаревшим
2. Explicit Invalidation
def update_user_and_invalidate_cache(user_id: int, new_data: dict):
# Обновляем БД
db.query(f"UPDATE users SET ... WHERE id = {user_id}")
# Сразу инвалидируем кеш
redis_client.delete(f"user:{user_id}")
# Следующий запрос сделает cache miss и перезагрузит из БД
3. Event-Driven Invalidation
import redis
from redis import Redis
# Используем Redis Pub/Sub для событий инвалидации
redis_client = Redis()
pubsub = redis_client.pubsub()
def listen_for_invalidation():
"""Слушает события инвалидации"""
pubsub.subscribe('user:invalidate')
for message in pubsub.listen():
if message['type'] == 'message':
user_id = message['data']
# Инвалидируем кеш
redis_client.delete(f"user:{user_id}")
print(f"Invalidated cache for user {user_id}")
def update_user_with_events(user_id: int, new_data: dict):
# Обновляем БД
db.query(f"UPDATE users SET ... WHERE id = {user_id}")
# Публикуем событие инвалидации
redis_client.publish('user:invalidate', user_id)
# Запускаем слушатель в отдельном потоке/процессе
import threading
listener_thread = threading.Thread(target=listen_for_invalidation, daemon=True)
listener_thread.start()
Cache Stampede (Thundering Herd)
Проблема: когда кеш истекает, все запросы одновременно идут в БД.
# Плохо: без защиты
import time
def get_user_slow(user_id: int) -> dict:
key = f"user:{user_id}"
user = redis_client.get(key)
if not user:
# 100 запросов одновременно выполняют запрос!
time.sleep(1) # Имитируем медленный DB
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
redis_client.setex(key, 60, json.dumps(user))
return json.loads(user)
# Решение 1: Probabilistic Early Expiration
def get_user_with_xfetch(user_id: int) -> dict:
import random
key = f"user:{user_id}"
ttl_key = f"user:{user_id}:ttl"
user = redis_client.get(key)
if not user:
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
redis_client.setex(key, 60, json.dumps(user))
else:
# С вероятностью 10% заново загружаем (раньше истечения TTL)
ttl = redis_client.ttl(key)
if ttl > 0 and random.random() < 0.1:
# Асинхронно обновляем в фоне
background_refresh.delay(user_id)
return json.loads(user)
# Решение 2: Distributed Lock (лок в Redis)
import uuid
def get_user_with_lock(user_id: int) -> dict:
key = f"user:{user_id}"
lock_key = f"user:{user_id}:lock"
user = redis_client.get(key)
if user:
return json.loads(user)
# Cache miss: пытаемся получить лок
lock_id = str(uuid.uuid4())
if redis_client.set(lock_key, lock_id, nx=True, ex=5): # nx=only if not exists
# Мы получили лок, загружаем из БД
try:
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
redis_client.setex(key, 60, json.dumps(user))
finally:
redis_client.delete(lock_key)
return user
else:
# Лок занят, ждём пока другой поток загрузит
for _ in range(10):
time.sleep(0.1)
user = redis_client.get(key)
if user:
return json.loads(user)
# Fallback: сами загружаем
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
return user
Best Practices
1. Используйте Redis для распределённого кеша
# Redis vs Memcached
# Redis: более мощный (streams, sorted sets, Lua scripts)
# Memcached: проще, быстрее для простых case
2. Кешируйте с учётом Time To Live
# Разные TTL для разных типов данных
TTL_CONFIG = {
"user": 3600, # 1 час для профиля
"user_feed": 300, # 5 минут для feed (меняется часто)
"product": 86400, # 1 день для product (меняется редко)
"session": 86400 * 7, # 7 дней для сессии
}
3. Мониторьте метрики кеша
# Hit rate, miss rate, eviction rate
redis_info = redis_client.info('stats')
hit_rate = redis_info['keyspace_hits'] / (
redis_info['keyspace_hits'] + redis_info['keyspace_misses']
) if (redis_info['keyspace_hits'] + redis_info['keyspace_misses']) > 0 else 0
print(f"Cache hit rate: {hit_rate * 100:.2f}%")
# Хорошо: > 80%
# Плохо: < 50% (нужно пересмотреть логику)
4. Обрабатывайте ошибки Redis gracefully
def get_user_with_fallback(user_id: int) -> dict:
try:
cached = redis_client.get(f"user:{user_id}")
if cached:
return json.loads(cached)
except redis.ConnectionError:
# Redis недоступен, игнорируем кеш
pass
# Читаем из БД (с или без кеша)
return db.query(f"SELECT * FROM users WHERE id = {user_id}")
Вывод
Кеширование требует баланса:
- Простота: Cache-Aside для большинства случаев
- Консистентность: Write-Through для важных данных
- Производительность: Write-Behind для масштабирования
- Защита: обработка cache stampede и graceful degradation
Ключевые компоненты: Redis, TTL, инвалидация, обработка ошибок.