← Назад к вопросам

Как реализовать кеширование?

2.2 Middle🔥 151 комментариев
#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как реализовать кеширование

Кеширование — это сохранение результатов дорогостоящих операций в быстром хранилище для повторного использования без переделывания работы. Это критический инструмент для масштабирования приложений, но требует правильной реализации.

Основные типы кеша

1. In-Memory Cache (Памяти процесса)

Идея: хранить данные в оперативной памяти приложения.

# Простой пример: встроенный Python кеш
from functools import lru_cache
from datetime import datetime, timedelta

@lru_cache(maxsize=1000)  # Кеш последних 1000 результатов
def expensive_calculation(n: int) -> int:
    """Вычисляет факториал (дорого)"""
    print(f"Computing factorial({n})...")
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

# Использование
print(expensive_calculation(100))  # Вычисляет (медленно)
print(expensive_calculation(100))  # Читает из кеша (быстро)

# Проблемы LRU cache:
# - Теряется при перезагрузке приложения
# - Каждый процесс имеет свой кеш
# - В микросервисной архитектуре не помогает

Пример с TTL (Time To Live):

import time
from typing import Any, Optional

class SimpleCache:
    def __init__(self):
        self.cache = {}  # {key: (value, expiry_time)}
    
    def get(self, key: str) -> Optional[Any]:
        if key not in self.cache:
            return None
        
        value, expiry_time = self.cache[key]
        if time.time() > expiry_time:
            del self.cache[key]  # Удаляем истёкший кеш
            return None
        
        return value
    
    def set(self, key: str, value: Any, ttl_seconds: int = 3600):
        expiry_time = time.time() + ttl_seconds
        self.cache[key] = (value, expiry_time)
    
    def delete(self, key: str):
        self.cache.pop(key, None)

# Использование
cache = SimpleCache()
cache.set("user_123", {"name": "Alice", "email": "alice@example.com"}, ttl_seconds=300)
user = cache.get("user_123")  # Читает из кеша
time.sleep(301)
user = cache.get("user_123")  # None (истёк TTL)

2. Distributed Cache (Распределённый кеш)

Идея: хранить кеш на отдельном сервисе, доступном всем приложениям.

Redis — самый популярный выбор:

import redis
import json
from datetime import timedelta

# Подключение к Redis
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True  # Автоматически декодирует strings
)

# Базовые операции
def cache_user(user_id: int, user_data: dict):
    """Кеширует данные пользователя на 1 час"""
    key = f"user:{user_id}"
    value = json.dumps(user_data)  # Сериализуем в JSON
    redis_client.setex(key, timedelta(hours=1), value)
    # setex = SET + EXpire (TTL)

def get_cached_user(user_id: int) -> dict:
    """Читает из кеша"""
    key = f"user:{user_id}"
    data = redis_client.get(key)  # None если нет или истёк TTL
    return json.loads(data) if data else None

def delete_user_cache(user_id: int):
    """Инвалидирует кеш при обновлении"""
    key = f"user:{user_id}"
    redis_client.delete(key)

# Использование
user_id = 123
user_data = {"name": "Alice", "email": "alice@example.com"}
cache_user(user_id, user_data)

user = get_cached_user(user_id)  # Читает из Redis
print(user)  # {"name": "Alice", "email": "alice@example.com"}

Memcached (альтернатива Redis):

import memcache

# Memcached: более простой, только key-value
mc = memcache.Client(['127.0.0.1:11211'])

mc.set('user:123', {'name': 'Alice'}, time=3600)  # 1 час TTL
user = mc.get('user:123')
mc.delete('user:123')

# Различия:
# Redis:    Complex data structures (lists, sets, hashes, streams)
# Memcached: Simple key-value, faster but less features

Стратегии кеширования

1. Cache-Aside (Lazy Loading)

Логика: приложение отвечает за загрузку и кеширование.

def get_user(user_id: int) -> dict:
    # Шаг 1: Проверяем кеш
    cached_user = redis_client.get(f"user:{user_id}")
    if cached_user:
        return json.loads(cached_user)
    
    # Шаг 2: Cache miss → читаем из БД
    print(f"Cache miss for user {user_id}")
    user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
    
    # Шаг 3: Кешируем результат
    redis_client.setex(f"user:{user_id}", 3600, json.dumps(user))
    
    return user

# Timeline
# t=0: get_user(123) → Cache miss → SELECT FROM DB → кешируем
# t=1: get_user(123) → Cache hit → читаем из Redis (быстро)
# t=3601: get_user(123) → Cache expired → SELECT FROM DB → кешируем

Плюсы: просто, только нужные данные кешируются Минусы: Cache stampede при одновременных cache miss'ах

2. Write-Through (Синхронное кеширование)

Логика: записываем сначала в кеш, потом в БД.

def update_user(user_id: int, new_data: dict):
    # Шаг 1: Обновляем кеш
    key = f"user:{user_id}"
    redis_client.setex(key, 3600, json.dumps(new_data))
    
    # Шаг 2: Обновляем БД
    db.query(f"UPDATE users SET ... WHERE id = {user_id}")
    
    # Проблема: если DB упадёт, данные потеряны!

Улучшение с транзакциями:

def update_user_safe(user_id: int, new_data: dict):
    try:
        # Обновляем БД (primary source of truth)
        db.query(f"UPDATE users SET ... WHERE id = {user_id}")
        
        # После успеха обновляем кеш
        key = f"user:{user_id}"
        redis_client.setex(key, 3600, json.dumps(new_data))
    except DatabaseError:
        # Если БД упала, кеш не обновляется
        raise

Плюсы: гарантирует консистентность кеша и БД Минусы: медленнее (две операции)

3. Write-Behind (Write-Back, асинхронное)

Логика: пишем в кеш сразу, затем асинхронно в БД.

from celery import shared_task

def update_user_async(user_id: int, new_data: dict):
    # Шаг 1: Обновляем кеш сразу (быстро)
    key = f"user:{user_id}"
    redis_client.setex(key, 3600, json.dumps(new_data))
    
    # Шаг 2: Отправляем задачу в очередь для обновления БД
    sync_user_to_db.delay(user_id, new_data)
    
    return {"status": "accepted"}  # Ответ почти сразу

@shared_task
def sync_user_to_db(user_id: int, new_data: dict):
    """Асинхронно пишет в БД"""
    try:
        db.query(f"UPDATE users SET ... WHERE id = {user_id}")
    except DatabaseError:
        # Retry или alert
        print(f"Failed to sync user {user_id}")

# Timeline
# t=0: API получает update_user() → обновляет Redis → возвращает 200 OK
# t=0.1s: Celery worker берёт задачу
# t=1s: Celery worker обновляет БД
# Между t=0 и t=1: если система упадёт, изменение потеряется!

Плюсы: быстрый отклик пользователю Минусы: может быть потеря данных (eventual consistency)

Инвалидация кеша

1. TTL (Time To Live)

# Самый простой способ: просто дождаться истечения времени
redis_client.setex("key", 3600, "value")  # 1 час TTL

# Проблема: если данные меняются чаще чем TTL, кеш может быть устаревшим

2. Explicit Invalidation

def update_user_and_invalidate_cache(user_id: int, new_data: dict):
    # Обновляем БД
    db.query(f"UPDATE users SET ... WHERE id = {user_id}")
    
    # Сразу инвалидируем кеш
    redis_client.delete(f"user:{user_id}")
    
    # Следующий запрос сделает cache miss и перезагрузит из БД

3. Event-Driven Invalidation

import redis
from redis import Redis

# Используем Redis Pub/Sub для событий инвалидации
redis_client = Redis()
pubsub = redis_client.pubsub()

def listen_for_invalidation():
    """Слушает события инвалидации"""
    pubsub.subscribe('user:invalidate')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            user_id = message['data']
            # Инвалидируем кеш
            redis_client.delete(f"user:{user_id}")
            print(f"Invalidated cache for user {user_id}")

def update_user_with_events(user_id: int, new_data: dict):
    # Обновляем БД
    db.query(f"UPDATE users SET ... WHERE id = {user_id}")
    
    # Публикуем событие инвалидации
    redis_client.publish('user:invalidate', user_id)

# Запускаем слушатель в отдельном потоке/процессе
import threading
listener_thread = threading.Thread(target=listen_for_invalidation, daemon=True)
listener_thread.start()

Cache Stampede (Thundering Herd)

Проблема: когда кеш истекает, все запросы одновременно идут в БД.

# Плохо: без защиты
import time

def get_user_slow(user_id: int) -> dict:
    key = f"user:{user_id}"
    user = redis_client.get(key)
    
    if not user:
        # 100 запросов одновременно выполняют запрос!
        time.sleep(1)  # Имитируем медленный DB
        user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
        redis_client.setex(key, 60, json.dumps(user))
    
    return json.loads(user)

# Решение 1: Probabilistic Early Expiration
def get_user_with_xfetch(user_id: int) -> dict:
    import random
    key = f"user:{user_id}"
    ttl_key = f"user:{user_id}:ttl"
    
    user = redis_client.get(key)
    if not user:
        user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
        redis_client.setex(key, 60, json.dumps(user))
    else:
        # С вероятностью 10% заново загружаем (раньше истечения TTL)
        ttl = redis_client.ttl(key)
        if ttl > 0 and random.random() < 0.1:
            # Асинхронно обновляем в фоне
            background_refresh.delay(user_id)
    
    return json.loads(user)

# Решение 2: Distributed Lock (лок в Redis)
import uuid

def get_user_with_lock(user_id: int) -> dict:
    key = f"user:{user_id}"
    lock_key = f"user:{user_id}:lock"
    
    user = redis_client.get(key)
    if user:
        return json.loads(user)
    
    # Cache miss: пытаемся получить лок
    lock_id = str(uuid.uuid4())
    if redis_client.set(lock_key, lock_id, nx=True, ex=5):  # nx=only if not exists
        # Мы получили лок, загружаем из БД
        try:
            user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
            redis_client.setex(key, 60, json.dumps(user))
        finally:
            redis_client.delete(lock_key)
        return user
    else:
        # Лок занят, ждём пока другой поток загрузит
        for _ in range(10):
            time.sleep(0.1)
            user = redis_client.get(key)
            if user:
                return json.loads(user)
        
        # Fallback: сами загружаем
        user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
        return user

Best Practices

1. Используйте Redis для распределённого кеша

# Redis vs Memcached
# Redis: более мощный (streams, sorted sets, Lua scripts)
# Memcached: проще, быстрее для простых case

2. Кешируйте с учётом Time To Live

# Разные TTL для разных типов данных
TTL_CONFIG = {
    "user": 3600,              # 1 час для профиля
    "user_feed": 300,          # 5 минут для feed (меняется часто)
    "product": 86400,          # 1 день для product (меняется редко)
    "session": 86400 * 7,      # 7 дней для сессии
}

3. Мониторьте метрики кеша

# Hit rate, miss rate, eviction rate
redis_info = redis_client.info('stats')
hit_rate = redis_info['keyspace_hits'] / (
    redis_info['keyspace_hits'] + redis_info['keyspace_misses']
) if (redis_info['keyspace_hits'] + redis_info['keyspace_misses']) > 0 else 0

print(f"Cache hit rate: {hit_rate * 100:.2f}%")
# Хорошо: > 80%
# Плохо: < 50% (нужно пересмотреть логику)

4. Обрабатывайте ошибки Redis gracefully

def get_user_with_fallback(user_id: int) -> dict:
    try:
        cached = redis_client.get(f"user:{user_id}")
        if cached:
            return json.loads(cached)
    except redis.ConnectionError:
        # Redis недоступен, игнорируем кеш
        pass
    
    # Читаем из БД (с или без кеша)
    return db.query(f"SELECT * FROM users WHERE id = {user_id}")

Вывод

Кеширование требует баланса:

  • Простота: Cache-Aside для большинства случаев
  • Консистентность: Write-Through для важных данных
  • Производительность: Write-Behind для масштабирования
  • Защита: обработка cache stampede и graceful degradation

Ключевые компоненты: Redis, TTL, инвалидация, обработка ошибок.

Как реализовать кеширование? | PrepBro