← Назад к вопросам

Как работает кэширование в Spark? Когда его применять?

3.0 Senior🔥 191 комментариев
#Apache Spark#Hadoop и распределенные системы

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Кэширование в Apache Spark

Концепция

Кэширование в Spark — это механизм сохранения RDD или DataFrame в памяти (или на диске) после первого вычисления, чтобы избежать пересчёта при повторном использовании.

Без кэширования каждое действие (action) пересчитывает весь DAG заново. С кэшированием результат повторного использования берётся из памяти.

Архитектура кэширования

┌─────────────────────────────────────────┐
│  RDD / DataFrame                        │
│  (трансформация)                        │
└────────────────┬────────────────────────┘
                 │
        .cache() или .persist()
                 │
┌────────────────▼────────────────────────┐
│  Первое вычисление (first action)       │
│  Результат сохраняется в памяти         │
└────────────────┬────────────────────────┘
                 │
        ┌─────────┴──────────┐
        │                    │
   Action 2              Action 3
   (из кэша)            (из кэша)

1. Основные методы кэширования

cache() — сокращённая версия

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Caching").getOrCreate()
df = spark.read.parquet("/data/large_dataset")

# cache() = persist(MEMORY_ONLY)
df.cache()

# Первое действие — читает и кэширует
count1 = df.count()  # Медленно, загружает в памяти

# Второе действие — из кэша
count2 = df.count()  # Быстро, из памяти

persist() — гибкое управление уровнем хранения

from pyspark import StorageLevel

# Разные уровни хранения
df.persist(StorageLevel.MEMORY_ONLY)        # Только память
df.persist(StorageLevel.MEMORY_AND_DISK)    # Память + диск
df.persist(StorageLevel.DISK_ONLY)          # Только диск
df.persist(StorageLevel.MEMORY_ONLY_2)      # Память × 2 копии
df.persist(StorageLevel.MEMORY_AND_DISK_2)  # Память+диск × 2

# По умолчанию
df.persist()  # = MEMORY_ONLY

2. Уровни хранения

StorageLevelОписаниеСкоростьПамятьОтказоустойчивость
MEMORY_ONLYВ памяти⚡⚡⚡ВысокоеНизкое
MEMORY_AND_DISKПамять + диск⚡⚡СреднееСреднее
DISK_ONLYТолько дискНизкоеВысокое
MEMORY_ONLY_2Память (2 копии)⚡⚡Очень высокоеСреднее
MEMORY_AND_DISK_2Память + диск (2)Очень высокоеВысокое

Выбор уровня

# Маленький DF, критична скорость
df_small.persist(StorageLevel.MEMORY_ONLY)

# Большой DF, может не влезть в память
df_large.persist(StorageLevel.MEMORY_AND_DISK)

# Очень критичные данные (не может быть потеряно)
df_critical.persist(StorageLevel.MEMORY_AND_DISK_2)

# Экономия памяти
df_huge.persist(StorageLevel.DISK_ONLY)

3. Когда применять кэширование

Используй кэш ЕСЛИ

# 1. DataFrame используется несколько раз
df = spark.read.parquet("/big_data")
df.cache()

result1 = df.filter(df.age > 30).count()
result2 = df.filter(df.city == 'NYC').count()
result3 = df.groupBy('category').count().collect()

# Без кэша: 3 полных чтения + фильтраций
# С кэшем: 1 чтение + фильтрация в памяти

# 2. Дорогостоящие трансформации
df_raw = spark.read.parquet("/raw")
df_cleaned = df_raw \
    .filter(df_raw.value.isNotNull()) \
    .withColumn("processed", func.upper(df_raw.name))

df_cleaned.cache()  # Кэшировать результат дорогой очистки

count = df_cleaned.count()      # Действие 1
df_filtered = df_cleaned.filter(...).count()  # Действие 2

НЕ используй кэш ЕСЛИ

# 1. DataFrame используется один раз
df = spark.read.parquet("/data")
result = df.count()  # Одно действие — кэш не поможет

# 2. Памяти недостаточно
df_huge = spark.read.parquet("/massive_data")
# Не кэшировать — спилл на диск замедляет
# Вместо этого: использовать MEMORY_AND_DISK
df_huge.persist(StorageLevel.MEMORY_AND_DISK)

# 3. DataFrame очень редко переиспользуется
df = spark.read.parquet("/data")
df.cache()  # Бессмысленно, если вероятность повторного использования < 10%

4. Практические примеры

Пример 1: Кэширование для join операций

df_users = spark.read.parquet("/users")
df_orders = spark.read.parquet("/orders")

# Если df_users маленький относительно orders
df_users.cache()

# Join будет быстрым благодаря broadcast join
df_joined = df_users.join(df_orders, "user_id")

count = df_joined.count()      # Медленно, первый раз
df_filtered = df_joined.filter(df_joined.amount > 100).count()  # Быстро

Пример 2: Pipeline с несколькими действиями

from pyspark.sql import functions as F

df = spark.read.parquet("/events")

# Этап 1: Очистка
df_clean = df.filter(F.col("event_type").isNotNull()) \
             .filter(F.col("timestamp") > "2024-01-01")
df_clean.cache()

# Этап 2: Несколько анализов на чистых данных
metric_1 = df_clean.groupBy("user_id").count().collect()
metric_2 = df_clean.groupBy("event_type").agg(F.count("*")).collect()
metric_3 = df_clean.select(F.avg("duration")).collect()

# Все метрики используют кэшированный df_clean

Пример 3: Контроль памяти

# Следить за памятью
df = spark.read.parquet("/data")
print(f"Executor memory: {spark.sparkContext._jsc.sc().getConf().get('spark.executor.memory')}")

# Если памяти мало — использовать диск
if total_size_gb > available_memory_gb * 0.8:
    df.persist(StorageLevel.MEMORY_AND_DISK)
else:
    df.persist(StorageLevel.MEMORY_ONLY)

5. Мониторинг кэша

Просмотр информации о кэше

# Spark UI: http://localhost:4040
# Вкладка Storage показывает:
# - Какие RDD/DF кэшированы
# - Занимаемая память
# - Уровень хранения

# Программно
sc = spark.sparkContext
rdd_storage = sc.getRDDStorageInfo()
for info in rdd_storage:
    print(f"RDD {info.id}: {info.memSize} bytes, {info.diskSize} bytes")

6. Очистка кэша

Когда удалять кэш

df.cache()
result = df.count()  # Кэшировано

# Когда DF больше не нужен
df.unpersist()  # Удалить из памяти

# Или удалить всё сразу
spark.sparkContext.setLogLevel("ERROR")
spark.catalog.clearCache()

# Удалить с указанием блокировки
df.unpersist(blocking=True)  # Ждать завершения удаления

7. Оптимизация кэша

Сжатие данных в кэше

# Конфигурация Spark
spark = SparkSession.builder \
    .config("spark.rdd.compress", "true") \
    .config("spark.io.compression.codec", "snappy") \
    .getOrCreate()

# Снижает размер в кэше на 30-40% с небольшим оверхедом CPU

Сериализация

spark = SparkSession.builder \
    .config("spark.rdd.compress", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Kryo быстрее Java сериализации

Выводы

  • cache() = persist(MEMORY_ONLY)
  • Используй когда DataFrame переиспользуется > 1 раза
  • Выбирай уровень хранения: MEMORY_ONLY для мелких, MEMORY_AND_DISK для больших
  • Всегда вызывай unpersist() после работы
  • Мониторь память через Spark UI
  • Кэш — это инструмент оптимизации, но может замедлить, если памяти нет

Правильное кэширование даёт ускорение в 2-10 раз при обработке повторяющихся данных.

Как работает кэширование в Spark? Когда его применять? | PrepBro