Как решал проблемы с памятью в Spark?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение проблем с памятью в Spark
Проблемы памяти — одни из самых частых в Spark приложениях. В этом ответе расскажу о практических подходах, которые применял в production.
Распознавание проблем памяти
Симптомы:
- OutOfMemoryError в логах
- GC overhead limit exceeded
- Executor потери или "not running" статус
- Чрезмерно долгое выполнение job's
Стратегия 1: Увеличение памяти Executor'ов
spark-submit \
--executor-memory 8g \
--driver-memory 4g \
--executor-cores 4 \
script.py
Практика: начинаем с 4g, увеличиваем до 8g или 16g при необходимости. На некоторых кластерах ограничение 32gb на executor.
Стратегия 2: Увеличение числа partition's
df = df.repartition(1000) # Увеличить раздробление данных
df = df.coalesce(100) # Уменьшить количество partitions
# Для shuffle операций
spark.conf.set("spark.sql.shuffle.partitions", 1000) # По умолчанию 200
Практика: при read OOM увеличиваем partitions, при write OOM — уменьшаем.
Стратегия 3: Использование Broadcast Join
from pyspark.sql.functions import broadcast
# Плохо: join большие таблицы
result = df_large1.join(df_large2, "id") # Выполняет shuffle
# Хорошо: broadcast маленькую таблицу
result = df_large.join(broadcast(df_small), "id") # Нет shuffle
Практика: если одна таблица меньше 100mb, используем broadcast (пороговое значение: spark.sql.autoBroadcastJoinThreshold).
Стратегия 4: Оптимизация DataFrame операций
# Плохо: фильтр после shuffle
df = df.join(other, "key").filter(col("date") > "2024-01-01")
# Хорошо: фильтр ДО shuffle
df = df.filter(col("date") > "2024-01-01").join(other, "key")
# Избегать collect на больших данных
df_list = df.collect() # ОПАСНО! Вся таблица в памяти driver'а
# Правильно: обработать по батчам
for row in df.toLocalIterator():
process(row)
Стратегия 5: Кэширование selectively
# Кэшировать только если используется несколько раз
df_cached = df.filter(...).cache()
result1 = df_cached.count() # Force materialization
result2 = df_cached.groupBy(...).count()
# Удалять кэш когда не нужен
df_cached.unpersist()
# Разные уровни кэширования
df.persist(StorageLevel.MEMORY_ONLY) # Только память
df.persist(StorageLevel.MEMORY_AND_DISK) # Memory + диск
df.persist(StorageLevel.DISK_ONLY) # Только диск
df.persist(StorageLevel.MEMORY_ONLY_2) # Память + репликация
Стратегия 6: Обработка по батчам больших файлов
# Плохо: читаем весь файл
df = spark.read.parquet("/huge/file.parquet") # 100gb в памяти
# Хорошо: обрабатываем по батчам
batch_size = 1_000_000 # 1M rows per batch
df = spark.read.parquet("/huge/file.parquet")
for batch_df in df.randomSplit([1/100]*100): # Раздельные батчи
process_batch(batch_df)
# Или использовать itertools
from pyspark.sql.functions import row_number
from pyspark.window import Window
window_spec = Window.orderBy(rand())
df_with_batch = df.withColumn("batch", (row_number().over(window_spec) - 1) / batch_size)
for batch_id in range(num_batches):
batch_df = df_with_batch.filter(col("batch") == batch_id)
process_batch(batch_df)
Стратегия 7: Оптимизация Garbage Collection
spark-submit \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
script.py
Практика: для Spark обычно достаточно дефолтного GC. G1GC помогает при очень больших heap'ах (16gb+).
Стратегия 8: Сжатие данных
# Читать с более эффективным форматом
df = spark.read.parquet("/data.parquet") # Уже сжато (snappy по умолчанию)
# Для CSV: читать меньше колонок
df = spark.read.csv("/data.csv").select(["id", "name"]) # Projection pushdown
# Сжатие при записи
df.write.parquet("/output", compression="snappy") # или "gzip", "lz4"
Стратегия 9: Использование Adaptive Query Execution (AQE)
# Включить AQE (по умолчанию в Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE автоматически:
# - Перестраивает join order
# - Объединяет маленькие partitions
# - Использует broadcast когда подходит
Пример: Production-ready обработка больших данных
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.storagelevel import StorageLevel
spark = SparkSession.builder \
.appName("MemoryOptimized") \
.config("spark.executor.memory", "8g") \
.config("spark.sql.shuffle.partitions", "500") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.autoBroadcastJoinThreshold", "104857600") # 100mb \
.getOrCreate()
try:
# Читаем с предварительной фильтрацией
df_raw = spark.read.parquet("/raw/events")
# Фильтруем РАНО
df_filtered = df_raw.filter(
(col("date") >= "2024-01-01") &
(col("event_type").isin(["click", "purchase"]))
).select(["user_id", "timestamp", "amount"])
# Читаем справочник (маленький)
df_catalog = spark.read.csv("/catalogs/products.csv")
# Используем broadcast для join
df_joined = df_filtered.join(
broadcast(df_catalog),
on="product_id",
how="left"
)
# Кэшируем если будет использоваться несколько раз
df_joined.persist(StorageLevel.MEMORY_AND_DISK)
# Агрегация
result = df_joined.groupBy("user_id").agg({
"amount": "sum",
"timestamp": "count"
})
# Результат
result.write.parquet("/output/aggregated", compression="snappy")
finally:
spark.stop()
Чеклист при memory issues
- Проверить Executors tab в Spark UI
- Увеличить executor memory (--executor-memory)
- Увеличить количество partitions (repartition)
- Использовать broadcast для маленьких таблиц
- Фильтровать ДО shuffle операций
- Проверить plan выполнения (explain())
- Включить AQE (spark.sql.adaptive.enabled)
- Убрать ненужные кэши (unpersist())
- Удалить collect() на больших данных
Заключение
Проблемы памяти в Spark решаются комбинацией: увеличение ресурсов, оптимизация partitions, правильная выбор join стратегии и использование новых оптимизаций как AQE. Главное — регулярный мониторинг Spark UI и планов выполнения.