← Назад к вопросам

Как решал проблемы с памятью в Spark?

2.7 Senior🔥 171 комментариев
#Apache Spark#Архитектура и проектирование#Опыт и soft skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение проблем с памятью в Spark

Проблемы памяти — одни из самых частых в Spark приложениях. В этом ответе расскажу о практических подходах, которые применял в production.

Распознавание проблем памяти

Симптомы:

  • OutOfMemoryError в логах
  • GC overhead limit exceeded
  • Executor потери или "not running" статус
  • Чрезмерно долгое выполнение job's

Стратегия 1: Увеличение памяти Executor'ов

spark-submit \
  --executor-memory 8g \
  --driver-memory 4g \
  --executor-cores 4 \
  script.py

Практика: начинаем с 4g, увеличиваем до 8g или 16g при необходимости. На некоторых кластерах ограничение 32gb на executor.

Стратегия 2: Увеличение числа partition's

df = df.repartition(1000)  # Увеличить раздробление данных
df = df.coalesce(100)      # Уменьшить количество partitions

# Для shuffle операций
spark.conf.set("spark.sql.shuffle.partitions", 1000)  # По умолчанию 200

Практика: при read OOM увеличиваем partitions, при write OOM — уменьшаем.

Стратегия 3: Использование Broadcast Join

from pyspark.sql.functions import broadcast

# Плохо: join большие таблицы
result = df_large1.join(df_large2, "id")  # Выполняет shuffle

# Хорошо: broadcast маленькую таблицу
result = df_large.join(broadcast(df_small), "id")  # Нет shuffle

Практика: если одна таблица меньше 100mb, используем broadcast (пороговое значение: spark.sql.autoBroadcastJoinThreshold).

Стратегия 4: Оптимизация DataFrame операций

# Плохо: фильтр после shuffle
df = df.join(other, "key").filter(col("date") > "2024-01-01")

# Хорошо: фильтр ДО shuffle
df = df.filter(col("date") > "2024-01-01").join(other, "key")

# Избегать collect на больших данных
df_list = df.collect()  # ОПАСНО! Вся таблица в памяти driver'а

# Правильно: обработать по батчам
for row in df.toLocalIterator():
    process(row)

Стратегия 5: Кэширование selectively

# Кэшировать только если используется несколько раз
df_cached = df.filter(...).cache()
result1 = df_cached.count()  # Force materialization
result2 = df_cached.groupBy(...).count()

# Удалять кэш когда не нужен
df_cached.unpersist()

# Разные уровни кэширования
df.persist(StorageLevel.MEMORY_ONLY)           # Только память
df.persist(StorageLevel.MEMORY_AND_DISK)      # Memory + диск
df.persist(StorageLevel.DISK_ONLY)            # Только диск
df.persist(StorageLevel.MEMORY_ONLY_2)        # Память + репликация

Стратегия 6: Обработка по батчам больших файлов

# Плохо: читаем весь файл
df = spark.read.parquet("/huge/file.parquet")  # 100gb в памяти

# Хорошо: обрабатываем по батчам
batch_size = 1_000_000  # 1M rows per batch
df = spark.read.parquet("/huge/file.parquet")
for batch_df in df.randomSplit([1/100]*100):  # Раздельные батчи
    process_batch(batch_df)

# Или использовать itertools
from pyspark.sql.functions import row_number
from pyspark.window import Window

window_spec = Window.orderBy(rand())
df_with_batch = df.withColumn("batch", (row_number().over(window_spec) - 1) / batch_size)

for batch_id in range(num_batches):
    batch_df = df_with_batch.filter(col("batch") == batch_id)
    process_batch(batch_df)

Стратегия 7: Оптимизация Garbage Collection

spark-submit \
  --conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
  script.py

Практика: для Spark обычно достаточно дефолтного GC. G1GC помогает при очень больших heap'ах (16gb+).

Стратегия 8: Сжатие данных

# Читать с более эффективным форматом
df = spark.read.parquet("/data.parquet")  # Уже сжато (snappy по умолчанию)

# Для CSV: читать меньше колонок
df = spark.read.csv("/data.csv").select(["id", "name"])  # Projection pushdown

# Сжатие при записи
df.write.parquet("/output", compression="snappy")  # или "gzip", "lz4"

Стратегия 9: Использование Adaptive Query Execution (AQE)

# Включить AQE (по умолчанию в Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# AQE автоматически:
# - Перестраивает join order
# - Объединяет маленькие partitions
# - Использует broadcast когда подходит

Пример: Production-ready обработка больших данных

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder \
    .appName("MemoryOptimized") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "500") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "104857600")  # 100mb \
    .getOrCreate()

try:
    # Читаем с предварительной фильтрацией
    df_raw = spark.read.parquet("/raw/events")
    
    # Фильтруем РАНО
    df_filtered = df_raw.filter(
        (col("date") >= "2024-01-01") & 
        (col("event_type").isin(["click", "purchase"]))
    ).select(["user_id", "timestamp", "amount"])
    
    # Читаем справочник (маленький)
    df_catalog = spark.read.csv("/catalogs/products.csv")
    
    # Используем broadcast для join
    df_joined = df_filtered.join(
        broadcast(df_catalog), 
        on="product_id",
        how="left"
    )
    
    # Кэшируем если будет использоваться несколько раз
    df_joined.persist(StorageLevel.MEMORY_AND_DISK)
    
    # Агрегация
    result = df_joined.groupBy("user_id").agg({
        "amount": "sum",
        "timestamp": "count"
    })
    
    # Результат
    result.write.parquet("/output/aggregated", compression="snappy")
    
finally:
    spark.stop()

Чеклист при memory issues

  • Проверить Executors tab в Spark UI
  • Увеличить executor memory (--executor-memory)
  • Увеличить количество partitions (repartition)
  • Использовать broadcast для маленьких таблиц
  • Фильтровать ДО shuffle операций
  • Проверить plan выполнения (explain())
  • Включить AQE (spark.sql.adaptive.enabled)
  • Убрать ненужные кэши (unpersist())
  • Удалить collect() на больших данных

Заключение

Проблемы памяти в Spark решаются комбинацией: увеличение ресурсов, оптимизация partitions, правильная выбор join стратегии и использование новых оптимизаций как AQE. Главное — регулярный мониторинг Spark UI и планов выполнения.