Как работает кэширование в Spark? Когда его применять?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Кэширование в Apache Spark
Концепция
Кэширование в Spark — это механизм сохранения RDD или DataFrame в памяти (или на диске) после первого вычисления, чтобы избежать пересчёта при повторном использовании.
Без кэширования каждое действие (action) пересчитывает весь DAG заново. С кэшированием результат повторного использования берётся из памяти.
Архитектура кэширования
┌─────────────────────────────────────────┐
│ RDD / DataFrame │
│ (трансформация) │
└────────────────┬────────────────────────┘
│
.cache() или .persist()
│
┌────────────────▼────────────────────────┐
│ Первое вычисление (first action) │
│ Результат сохраняется в памяти │
└────────────────┬────────────────────────┘
│
┌─────────┴──────────┐
│ │
Action 2 Action 3
(из кэша) (из кэша)
1. Основные методы кэширования
cache() — сокращённая версия
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Caching").getOrCreate()
df = spark.read.parquet("/data/large_dataset")
# cache() = persist(MEMORY_ONLY)
df.cache()
# Первое действие — читает и кэширует
count1 = df.count() # Медленно, загружает в памяти
# Второе действие — из кэша
count2 = df.count() # Быстро, из памяти
persist() — гибкое управление уровнем хранения
from pyspark import StorageLevel
# Разные уровни хранения
df.persist(StorageLevel.MEMORY_ONLY) # Только память
df.persist(StorageLevel.MEMORY_AND_DISK) # Память + диск
df.persist(StorageLevel.DISK_ONLY) # Только диск
df.persist(StorageLevel.MEMORY_ONLY_2) # Память × 2 копии
df.persist(StorageLevel.MEMORY_AND_DISK_2) # Память+диск × 2
# По умолчанию
df.persist() # = MEMORY_ONLY
2. Уровни хранения
| StorageLevel | Описание | Скорость | Память | Отказоустойчивость |
|---|---|---|---|---|
| MEMORY_ONLY | В памяти | ⚡⚡⚡ | Высокое | Низкое |
| MEMORY_AND_DISK | Память + диск | ⚡⚡ | Среднее | Среднее |
| DISK_ONLY | Только диск | ⚡ | Низкое | Высокое |
| MEMORY_ONLY_2 | Память (2 копии) | ⚡⚡ | Очень высокое | Среднее |
| MEMORY_AND_DISK_2 | Память + диск (2) | ⚡ | Очень высокое | Высокое |
Выбор уровня
# Маленький DF, критична скорость
df_small.persist(StorageLevel.MEMORY_ONLY)
# Большой DF, может не влезть в память
df_large.persist(StorageLevel.MEMORY_AND_DISK)
# Очень критичные данные (не может быть потеряно)
df_critical.persist(StorageLevel.MEMORY_AND_DISK_2)
# Экономия памяти
df_huge.persist(StorageLevel.DISK_ONLY)
3. Когда применять кэширование
Используй кэш ЕСЛИ
# 1. DataFrame используется несколько раз
df = spark.read.parquet("/big_data")
df.cache()
result1 = df.filter(df.age > 30).count()
result2 = df.filter(df.city == 'NYC').count()
result3 = df.groupBy('category').count().collect()
# Без кэша: 3 полных чтения + фильтраций
# С кэшем: 1 чтение + фильтрация в памяти
# 2. Дорогостоящие трансформации
df_raw = spark.read.parquet("/raw")
df_cleaned = df_raw \
.filter(df_raw.value.isNotNull()) \
.withColumn("processed", func.upper(df_raw.name))
df_cleaned.cache() # Кэшировать результат дорогой очистки
count = df_cleaned.count() # Действие 1
df_filtered = df_cleaned.filter(...).count() # Действие 2
НЕ используй кэш ЕСЛИ
# 1. DataFrame используется один раз
df = spark.read.parquet("/data")
result = df.count() # Одно действие — кэш не поможет
# 2. Памяти недостаточно
df_huge = spark.read.parquet("/massive_data")
# Не кэшировать — спилл на диск замедляет
# Вместо этого: использовать MEMORY_AND_DISK
df_huge.persist(StorageLevel.MEMORY_AND_DISK)
# 3. DataFrame очень редко переиспользуется
df = spark.read.parquet("/data")
df.cache() # Бессмысленно, если вероятность повторного использования < 10%
4. Практические примеры
Пример 1: Кэширование для join операций
df_users = spark.read.parquet("/users")
df_orders = spark.read.parquet("/orders")
# Если df_users маленький относительно orders
df_users.cache()
# Join будет быстрым благодаря broadcast join
df_joined = df_users.join(df_orders, "user_id")
count = df_joined.count() # Медленно, первый раз
df_filtered = df_joined.filter(df_joined.amount > 100).count() # Быстро
Пример 2: Pipeline с несколькими действиями
from pyspark.sql import functions as F
df = spark.read.parquet("/events")
# Этап 1: Очистка
df_clean = df.filter(F.col("event_type").isNotNull()) \
.filter(F.col("timestamp") > "2024-01-01")
df_clean.cache()
# Этап 2: Несколько анализов на чистых данных
metric_1 = df_clean.groupBy("user_id").count().collect()
metric_2 = df_clean.groupBy("event_type").agg(F.count("*")).collect()
metric_3 = df_clean.select(F.avg("duration")).collect()
# Все метрики используют кэшированный df_clean
Пример 3: Контроль памяти
# Следить за памятью
df = spark.read.parquet("/data")
print(f"Executor memory: {spark.sparkContext._jsc.sc().getConf().get('spark.executor.memory')}")
# Если памяти мало — использовать диск
if total_size_gb > available_memory_gb * 0.8:
df.persist(StorageLevel.MEMORY_AND_DISK)
else:
df.persist(StorageLevel.MEMORY_ONLY)
5. Мониторинг кэша
Просмотр информации о кэше
# Spark UI: http://localhost:4040
# Вкладка Storage показывает:
# - Какие RDD/DF кэшированы
# - Занимаемая память
# - Уровень хранения
# Программно
sc = spark.sparkContext
rdd_storage = sc.getRDDStorageInfo()
for info in rdd_storage:
print(f"RDD {info.id}: {info.memSize} bytes, {info.diskSize} bytes")
6. Очистка кэша
Когда удалять кэш
df.cache()
result = df.count() # Кэшировано
# Когда DF больше не нужен
df.unpersist() # Удалить из памяти
# Или удалить всё сразу
spark.sparkContext.setLogLevel("ERROR")
spark.catalog.clearCache()
# Удалить с указанием блокировки
df.unpersist(blocking=True) # Ждать завершения удаления
7. Оптимизация кэша
Сжатие данных в кэше
# Конфигурация Spark
spark = SparkSession.builder \
.config("spark.rdd.compress", "true") \
.config("spark.io.compression.codec", "snappy") \
.getOrCreate()
# Снижает размер в кэше на 30-40% с небольшим оверхедом CPU
Сериализация
spark = SparkSession.builder \
.config("spark.rdd.compress", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
# Kryo быстрее Java сериализации
Выводы
- cache() = persist(MEMORY_ONLY)
- Используй когда DataFrame переиспользуется > 1 раза
- Выбирай уровень хранения: MEMORY_ONLY для мелких, MEMORY_AND_DISK для больших
- Всегда вызывай unpersist() после работы
- Мониторь память через Spark UI
- Кэш — это инструмент оптимизации, но может замедлить, если памяти нет
Правильное кэширование даёт ускорение в 2-10 раз при обработке повторяющихся данных.