Как устроенно кэширование?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Кэширование: принципы, уровни и реализация
Кэширование — один из самых мощных инструментов для оптимизации производительности систем. В качестве Data Engineer я применяю кэширование на разных уровнях: от кэша процессора до приложения и базы данных.
Иерархия кэширования
От самого быстрого к самому медленному:
CPU L1 Cache (~0.5 ns) — кэш процессора, 32KB
↓
CPU L2 Cache (~2 ns) — кэш процессора, 256KB
↓
CPU L3 Cache (~10 ns) — кэш процессора, 8MB
↓
Оперативная память (~100 ns) — RAM, несколько GB
↓
CPU Cache (~10 мкс) — Redis, Memcached
↓
Дисковый кэш (~10 мс) — SSD, HDD
↓
База данных (~100 мс) — PostgreSQL, MySQL
Принцип 80/20 в кэшировании
Парето принцип: 80% запросов — это 20% данных. Кэшируем эти 20% и получаем 80% ускорение.
Уровень 1: Database Query Caching
Идея: сохранить результаты дорогих запросов в памяти
import redis
import json
from hashlib import md5
from datetime import timedelta
# Инициализируем Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cached_query_result(query, ttl_seconds=3600):
"""
Функция получения результатов запроса с кэшем
"""
# Создаём уникальный ключ кэша на основе запроса
cache_key = f"query:{md5(query.encode()).hexdigest()}"
# 1. Пытаемся получить из кэша
cached = redis_client.get(cache_key)
if cached:
print("Cache HIT")
return json.loads(cached)
# 2. Кэш miss — выполняем запрос
print("Cache MISS")
result = execute_database_query(query)
# 3. Сохраняем в кэш
redis_client.setex(
cache_key,
timedelta(seconds=ttl_seconds),
json.dumps(result)
)
return result
# Использование
result = get_cached_query_result(
"SELECT COUNT(*) FROM users WHERE status='active'",
ttl_seconds=300 # Кэш на 5 минут
)
Уровень 2: Application-Level Caching
Идея: кэшировать данные прямо в приложении
from functools import lru_cache
import time
# 1. Python LRU Cache (простой вариант)
@lru_cache(maxsize=128)
def get_user_profile(user_id: int):
"""
Функция кэшируется автоматически
Хранит последние 128 уникальных результатов
"""
return fetch_from_database(user_id)
# 2. Более продвинутый вариант с TTL (Time To Live)
import cachetools
cache = cachetools.TTLCache(
maxsize=1000, # Максимум 1000 элементов
ttl=300 # Время жизни 5 минут
)
def cached_expensive_operation(key, operation_func):
if key not in cache:
cache[key] = operation_func()
return cache[key]
# Использование
result = cached_expensive_operation(
'top_10_products',
lambda: calculate_top_10_products()
)
Уровень 3: Database Query Plan Caching
Базы данных кэшируют свои собственные планы выполнения:
-- PostgreSQL показывает кэш статистику
SELECT
query,
calls,
mean_exec_time,
max_exec_time
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;
-- MySQL
SHOW STATUS LIKE 'Qcache_%';
-- Qcache_hits — попадания в кэш
-- Qcache_misses — промахи кэша
Уровень 4: Buffer Pool & Page Cache
Buffer Pool — кэш страниц БД в памяти сервера
# PostgreSQL: размер shared_buffers
# Рекомендация: 25% от системной памяти
# Для 128GB сервера: shared_buffers = 32GB
# Проверить статистику буфер пула
query = """
SELECT
heap_blks_read, -- Блоки прочитаны с диска
heap_blks_hit, -- Блоки найдены в памяти
(heap_blks_hit::float /
(heap_blks_hit + heap_blks_read)::float * 100)::int AS hit_ratio
FROM pg_statio_user_tables
WHERE relname = 'users';
"""
# Hit ratio >99% — хороший показатель
Уровень 5: Compression и Columnar Format
Идея: сжать данные, чтобы поместилось больше в кэш
# Parquet формат автоматически сжимает данные
import pyarrow.parquet as pq
import pandas as pd
df = pd.read_parquet('data.parquet')
# Статистика использования памяти
print(f"CSV size: {os.path.getsize('data.csv') / 1024 / 1024:.1f}MB")
print(f"Parquet size: {os.path.getsize('data.parquet') / 1024 / 1024:.1f}MB")
# Обычно Parquet на 80% меньше, чем CSV
Уровень 6: Distributed Caching (Redis / Memcached)
Идея: общий кэш для всех приложений в микросервисной архитектуре
import redis
from redis import RedisCluster
# Одиночный Redis сервер
redis_single = redis.Redis(host='localhost', port=6379)
# Redis Cluster (масштабируемый вариант)
redis_cluster = RedisCluster(
startup_nodes=[{'host': 'node1', 'port': 6379}],
skip_full_coverage_check=True
)
# SET / GET операции
redis_cluster.set('user:123:profile', '{"name": "John"}', ex=3600)
profile = redis_cluster.get('user:123:profile')
# TTL управление
redis_cluster.expire('key', 3600) # Установить TTL на 1 час
ttl = redis_cluster.ttl('key') # Получить оставшееся время
# Мониторинг Redis
info = redis_cluster.info('stats')
print(f"Memory usage: {info['used_memory_human']}")
print(f"Connected clients: {info['connected_clients']}")
Уровень 7: HTTP Caching (Browser & CDN)
Идея: кэшировать HTTP ответы на уровне браузера и CDN
from fastapi import FastAPI, Response
from datetime import datetime, timedelta
app = FastAPI()
@app.get("/api/products")
async def get_products(response: Response):
"""
Кэширование HTTP ответа
"""
# Cache-Control headers
response.headers["Cache-Control"] = "public, max-age=3600" # 1 час
response.headers["ETag"] = '"v1.2.3"' # Версия данных
response.headers["Last-Modified"] = "Wed, 21 Oct 2024 07:28:00 GMT"
return {'products': get_products_from_db()}
# На CDN (CloudFlare, Akamai):
# Browser cache: 1 час (max-age)
# Edge cache (CDN): 24 часа
Cache Invalidation Strategies
Самая сложная проблема в Computer Science — как обновить кэш?
1. Time-based (TTL)
# Просто истекает через X секунд
redis_client.setex('key', 300, value) # 5 минут
2. Event-based (Invalidation)
def update_user(user_id, data):
# Обновляем в БД
db.update_user(user_id, data)
# Инвалидируем кэш
redis_client.delete(f"user:{user_id}:profile")
redis_client.delete(f"users:all") # Кэш списка тоже устарел
3. Versioning
# Версионируем кэш ключи
version = 2
cache_key = f"user:{user_id}:profile:v{version}"
# Если обновили логику, просто меняем версию
4. Tag-based (Tag invalidation)
# Redis doesn't support tags, но можно использовать Redis Search
from redis.commands.search.client import Search
# Инвалидировать все кэши тега
redis_client.delete("user:*:profile") # Все профили пользователей
Monitoring & Debugging
# Метрики кэша
def get_cache_stats():
info = redis_client.info('stats')
return {
'total_connections_received': info['total_connections_received'],
'total_commands_processed': info['total_commands_processed'],
'instantaneous_ops_per_sec': info['instantaneous_ops_per_sec'],
'memory_used': info['used_memory_human'],
'hit_ratio': calculate_hit_ratio()
}
def calculate_hit_ratio():
info = redis_client.info('stats')
hits = info['keyspace_hits']
misses = info['keyspace_misses']
if hits + misses == 0:
return 0
return (hits / (hits + misses)) * 100
# Логирование
print(f"Cache hit ratio: {calculate_hit_ratio():.2f}%")
Best Practices
- Профилируйте перед оптимизацией — не кэшируйте без данных
- Hit ratio >80% — признак хорошего кэша
- Не кэшируйте слишком большие данные — кэш работает на памяти
- Используйте proper TTL — слишком короткий = бесполезно, слишком длинный = старые данные
- Мониторьте eviction — когда кэш переполняется, какая стратегия удаления?
Лучше кэширование уменьшает latency с 500ms до 5ms и позволяет масштабировать систему в 100+ раз.