← Назад к вопросам

Какие настройки влияют на производительность Spark?

2.0 Middle🔥 121 комментариев
#Apache Spark#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Настройки производительности Spark

Производительность Apache Spark зависит от множества параметров конфигурации. Правильная настройка этих параметров критична для эффективной обработки больших данных.

1. Параметры выделения ресурсов

spark.executor.instances — количество executors (рабочих процессов)

spark = SparkSession.builder \
    .config("spark.executor.instances", "50") \
    .getOrCreate()
# Больше executors = параллельно обрабатывается больше партиций
# Но нужно достаточно памяти и CPU на кластере

spark.executor.cores — количество ядер на executor

spark = SparkSession.builder \
    .config("spark.executor.cores", "4") \
    .getOrCreate()
# Обычно: 4-8 ядер на executor
# Слишком много ядер ведёт к конкуренции за память

spark.executor.memory — оперативная память на executor

# Минимум 4GB, обычно 8-16GB
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

spark.driver.memory — память для driver процесса

# Обычно 4-8GB, для крупных broadcast больше
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

2. Параметры памяти

spark.memory.fraction — доля памяти executor для Spark (остаток для системы)

.config("spark.memory.fraction", "0.6")  # 60% памяти для Spark
# По умолчанию 0.6 (60%), увеличение экономит память но замедляет GC

spark.memory.storageFraction — доля памяти Spark для кэша

.config("spark.memory.storageFraction", "0.5")  # 50% для cache/broadcast
# Оставляем место для shuffle операций

spark.shuffle.memoryFraction (устаревший, используй memory.fraction)

Память для shuffle операций — очень важна! Недостаток памяти → spill on disk → медленно

3. Параметры shuffle

spark.shuffle.partitions — количество партиций после shuffle

.config("spark.sql.shuffle.partitions", "200")  # По умолчанию 200

# Если:
# - Мало данных: уменьши до 50-100
# - Много данных: увеличь до 500-1000
# - Много OOM ошибок: увеличь (больше партиций = меньше на каждую)

spark.shuffle.compress — сжатие shuffle данных

.config("spark.shuffle.compress", "true")  # Включить сжатие
# Экономит дисковое пространство и I/O, но требует CPU

spark.shuffle.spill.compress — сжатие spilled данных

.config("spark.shuffle.spill.compress", "true")  # По умолчанию true

4. Параметры сериализации

spark.serializer — алгоритм сериализации

.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Kryo намного быстрее Java serialization
# Но требует настройки классов для регистрации

spark.kryoserializer.buffer.max — максимальный размер буфера Kryo

.config("spark.kryoserializer.buffer.max", "256m")
# Если буфер переполнится — ошибка, увеличь

5. Параметры сетевого взаимодействия

spark.network.timeout — таймаут сетевых операций

.config("spark.network.timeout", "120s")  # По умолчанию 120s
# Увели, если работаешь с медленной сетью

spark.rpc.askTimeout — таймаут RPC запросов

.config("spark.rpc.askTimeout", "120s")

6. Параметры брокерджа (Broadcast)

spark.broadcast.blockSize — размер блока broadcast

.config("spark.broadcast.blockSize", "4m")  # По умолчанию 4m
# Больше блок = меньше overhead но больше задержка

spark.sql.broadcastTimeout — таймаут broadcast

.config("spark.sql.broadcastTimeout", "300s")

7. Параметры SQL оптимизации

spark.sql.adaptive.enabled — адаптивное исполнение SQL

.config("spark.sql.adaptive.enabled", "true")  # Spark 3.0+
# Оптимизирует shuffle размеры на лету

spark.sql.adaptive.coalescePartitions.enabled — объединение маленьких партиций

.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Уменьшает overhead от маленьких задач

spark.sql.adaptive.skewJoin.enabled — обработка skewed данных в join

.config("spark.sql.adaptive.skewJoin.enabled", "true")
# Если одна партиция намного больше других — разбивает её

8. Параметры кэширования

spark.storage.memoryMapThreshold — порог для memory mapping

.config("spark.storage.memoryMapThreshold", "2m")

9. Параметры заданий

spark.task.cpus — ядер на задачу (по умолчанию 1)

.config("spark.task.cpus", "1")  # Обычно оставляй 1

spark.default.parallelism — параллелизм по умолчанию

.config("spark.default.parallelism", "200")
# Рекомендуется: количество ядер * 2-3

10. Параметры спекулятивного выполнения

spark.speculation — запуск медленных задач на других executors

.config("spark.speculation", "true")  # По умолчанию false
# Помогает при неровном распределении работы

spark.speculation.interval — интервал проверки медленных задач

.config("spark.speculation.interval", "100ms")

Типичные проблемы и решения

Out of Memory (OOM) ошибки:

# 1. Увеличь memoria spark.executor.memory
spark = SparkSession.builder \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()

# 2. Уменьши размер broadcast
df.broadcast(broadcast_df)  # Явный контроль

# 3. Увеличь partition count для меньших партиций
.config("spark.sql.shuffle.partitions", "500")

# 4. Используй Kryo для сериализации
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Медленное выполнение:

# 1. Включи adaptive query execution
.config("spark.sql.adaptive.enabled", "true")

# 2. Включи skew join handling
.config("spark.sql.adaptive.skewJoin.enabled", "true")

# 3. Увеличь executor instances
.config("spark.executor.instances", "100")

# 4. Используй кэширование часто используемых датафреймов
df.cache()

Проблемы с сетью:

# Увеличь таймауты
.config("spark.network.timeout", "300s") \
.config("spark.rpc.askTimeout", "300s")

Пример оптимальной конфигурации для больших данных

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DataPipeline") \
    .config("spark.executor.instances", "100") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.fraction", "0.7") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.default.parallelism", "400") \
    .getOrCreate()

Помни: нет универсальной конфигурации. Всегда профилируй и экспериментируй с параметрами для твоих конкретных данных и задач.

Какие настройки влияют на производительность Spark? | PrepBro