← Назад к вопросам

Что происходит под капотом при выполнении shuffle в Spark?

2.7 Senior🔥 241 комментариев
#Apache Spark#Hadoop и распределенные системы

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Shuffle в Apache Spark: детальный анализ

Shuffle — это одна из самых дорогих операций в Spark, которая перемещает данные между worker-нодами. Понимание того, как shuffle работает под капотом, критично для оптимизации производительности. Разберу этот механизм детально.

Что такое shuffle?

Определение: Shuffle — это процесс перераспределения данных между нодами кластера в зависимости от значения ключа. Это происходит при операциях вроде GROUP BY, JOIN, DISTINCT.

Операции, которые вызывают shuffle

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("shuffle_example").getOrCreate()
df = spark.read.parquet("s3://data/sales")

# Все эти операции ВЫЗЫВАЮТ shuffle:

# 1. GROUP BY
df.groupBy("category").sum("amount").collect()  # shuffle по category

# 2. JOIN
df1 = spark.read.parquet("s3://data/table1")
df2 = spark.read.parquet("s3://data/table2")
df1.join(df2, on="key", how="inner")  # shuffle по key

# 3. DISTINCT
df.select("region").distinct().collect()  # shuffle по region

# 4. SORT (в некоторых случаях)
df.sort("date").collect()

# 5. REPARTITION
df.repartition(100, "category")

# 6. Window functions (с OVER)
from pyspark.sql.window import Window
window = Window.partitionBy("user_id").orderBy("date")
df.withColumn("row_num", F.row_number().over(window))

Архитектура shuffle в Spark

Shuffle состоит из двух этапов: MAP и REDUCE.

Исходные данные (partition 0, 1, 2, ...)
    |
    v
MAP-этап (на каждой ноде)
    |
    ├─ Вычисление partition ID для каждого ключа
    ├─ Спилл (spillage) на диск (если не влезает в память)
    └─ Создание файлов map output
    |
    v
SHUFFLE-этап (сетевая передача данных)
    |
    ├─ Отправка данных между нодами
    ├─ Переупорядочение
    └─ Группировка по partition
    |
    v
REDUCE-этап (на целевых нодах)
    |
    ├─ Получение данных
    ├─ Слияние (merge) файлов
    └─ Выполнение агрегации/группировки

Детальное описание каждого этапа

Этап 1: Map Phase (на исходных нодах)

# Пример данных:
# partition 0: (user_1, 100), (user_2, 150)
# partition 1: (user_1, 200), (user_3, 120)

# После map-фазы:
# На каждой ноде данные организованы в буферы по целевым partitions
# Например, если у нас 4 целевых partition:

# Buffer для partition 0: (user_3, 120)
# Buffer для partition 1: (user_1, 100), (user_1, 200)
# Buffer для partition 2: (user_2, 150)
# Buffer для partition 3: empty

# Алгоритм: partition_id = hash(key) % num_partitions

Что происходит:

  1. Для каждой записи вычисляется partition_id = hash(key) % num_target_partitions
  2. Данные группируются в буферы памяти по partition_id
  3. Когда буфер переполняется, данные спилятся на диск
  4. Создаются файлы вида shuffle_0_0_0.data, shuffle_0_0_1.data и т.д.

Проблема: если данные не влезают в память (OOM), shuffle падает.

Этап 2: Shuffle-передача (Shuffle-phase)

Данные отправляются между нодами по сети.

Node 0 (на диске):
  ├─ shuffle_0_0_0.data  (для partition 0)
  ├─ shuffle_0_0_1.data  (для partition 1)
  ├─ shuffle_0_0_2.data  (для partition 2)
  └─ shuffle_0_0_3.data  (для partition 3)

Это данные ОТПРАВЛЯЮТСЯ:

Node 0 -> Node 0 (partition 0)
Node 0 -> Node 1 (partition 1)
Node 0 -> Node 2 (partition 2)
Node 0 -> Node 3 (partition 3)

Cпособ передачи:

  • Push-based shuffle (старый, до Spark 3.3): исходные ноды активно отправляют
  • Pull-based shuffle (новый, Spark 3.3+): целевые ноды активно получают

Этап 3: Reduce Phase (на целевых нодах)

# На целевой ноде получаем данные
# Partition 1 получает:
# - shuffle_0_0_1.data (от Node 0)
# - shuffle_0_1_1.data (от Node 1)
# - shuffle_0_2_1.data (от Node 2)

# Потом:
# 1. Сортируем и объединяем (merge)
# 2. Выполняем операцию (GROUP BY, JOIN и т.д.)
# 3. Возвращаем результат

Параметры shuffle, которые влияют на производительность

spark.conf.set("spark.shuffle.partitions", 200)  # По умолчанию 200
# Сколько партиций создать после shuffle
# Более высокое значение = больше параллелизма, но больше overhead
# Оптимально: num_cores * 4

spark.conf.set("spark.shuffle.file.buffer", "32k")  # Размер буфера
# Буфер в памяти перед спиллом на диск
# Больше буфер = меньше спилл операций

spark.conf.set("spark.io.compression.codec", "snappy")  # Компрессия shuffle
# Варианты: lz4, snappy, gzip
# Компрессия экономит сеть, но требует CPU

spark.conf.set("spark.shuffle.compress", "true")  # Сжимать shuffle блоки

spark.conf.set("spark.shuffle.spillCompressed", "false")  # Сжимать спилл
# При true: экономит дисковое место, но требует CPU

spark.conf.set("spark.io.encryption.enabled", "true")  # Шифровать shuffle

spark.conf.set("spark.sql.shuffle.partitions", "200")  # Для SQL операций

Пример: как реально работает GROUP BY

from pyspark.sql import functions as F

df = spark.createDataFrame([
    ("user_1", 100),
    ("user_2", 150),
    ("user_1", 200),
    ("user_3", 120),
], ["user_id", "amount"])

# Операция: сумма по пользователям
result = df.groupBy("user_id").sum("amount")

# ЧТО ПРОИСХОДИТ:
# 1. MAP PHASE:
#    - Вычисляем: partition_id = hash("user_1") % 200 = 42
#    - Вычисляем: partition_id = hash("user_2") % 200 = 157
#    - Вычисляем: partition_id = hash("user_3") % 200 = 88
#
#    Буферы:
#    partition[42]: ("user_1", 100), ("user_1", 200)
#    partition[157]: ("user_2", 150)
#    partition[88]: ("user_3", 120)
#    partition[0-41, 43-87, 89-156, 158-199]: empty

# 2. SHUFFLE PHASE:
#    Данные отправляются на целевые ноды

# 3. REDUCE PHASE:
#    На partition[42]:
#    - Получаем все ("user_1", *) данные
#    - Вычисляем sum: 100 + 200 = 300
#    - Возвращаем ("user_1", 300)

Спилл (Spillage) — критический момент

Если буфер памяти переполняется:

# MEMORY PRESSURE:
# Spark отслеживает использование памяти
# Когда память заканчивается:

# 1. Данные в буферах спиливаются на диск
spark.conf.set("spark.shuffle.memoryFraction", "0.2")  # 20% памяти на shuffle

# 2. Это ОЧЕНЬ МЕДЛЕННО:
#    - Дисковая запись 100x медленнее памяти
#    - Может привести к многократному замедлению

# 3. Если данные не влезают даже после спилла:
#    - OutOfMemory Exception
#    - Task падает и перезапускается

Как оптимизировать shuffle

1. Редуцируй объём данных ДО shuffle

# ❌ Неправильно: shuffle всех данных
df_all = spark.read.parquet("s3://huge-dataset/")
result = df_all.groupBy("category").sum("amount")

# ✅ Правильно: сначала отфильтруй
df_filtered = df_all.filter(df_all.date >= "2024-01-01")
result = df_filtered.groupBy("category").sum("amount")

2. Уменьши количество shuffle операций

# ❌ Неправильно: two shuffles
df.groupBy("category").sum().collect()
df.groupBy("region").count().collect()  # Второй shuffle!

# ✅ Правильно: один запрос, один shuffle
df_cat = df.groupBy("category").sum()
df_reg = df.groupBy("region").count()

3. Оптимизируй количество партиций

# Если слишком мало партиций: большие задачи, медленно
spark.conf.set("spark.shuffle.partitions", 100)

# Если слишком много партиций: overhead, медленно
spark.conf.set("spark.shuffle.partitions", 1000)

# Оптимально: (num_cores * 4) или 100-200 для среднего кластера

4. Используй Adaptive Query Execution (AQE)

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

AQE автоматически оптимизирует shuffle на основе статистики.

5. Увеличь память для shuffle

spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.shuffle.memoryFraction", "0.3")  # До 30%

Мониторинг shuffle

Чтобы увидеть статистику shuffle:

# В Spark UI (localhost:4040):
# - Stage view -> посмотреть shuffle read/write
# - Executor view -> использование памяти

# Программно:
result = df.groupBy("category").sum()
print(result.explain(extended=True))

Типичные проблемы

  1. Shuffle spill: данные не влезают в память

    • Решение: увеличить память или отфильтровать данные
  2. Shuffle skew: один partition получает намного больше данных

    • Решение: использовать Adaptive Query Execution или предобработать данные
  3. Network bottleneck: медленная передача данных между нодами

    • Решение: использовать компрессию
  4. Disk I/O bottleneck: спилл на диск очень медленный

    • Решение: увеличить буфер или память

Shuffle — это сердце Spark. Правильное понимание его механики — ключ к созданию эффективных pipeline-ов.

Что происходит под капотом при выполнении shuffle в Spark? | PrepBro