← Назад к вопросам

В чем разница между narrow (узкими) и wide (широкими) операциями в Apache Spark?

2.0 Middle🔥 221 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Узкие и широкие операции в Apache Spark

Что это такое

Узкие операции (narrow transformations) — это трансформации, где каждая строка выходного RDD зависит только от одной строки входного RDD. Широкие операции (wide transformations) — это трансформации, где каждая строка выходного RDD может зависеть от множества строк входного RDD.

Эта классификация критична для понимания оптимизации Spark и управления памятью при обработке распределённых данных.

Узкие операции (Narrow Transformations)

Узкие операции выполняются в рамках одного stage без перетасовки данных между партициями.

Примеры узких операций:

  • map() — преобразование каждого элемента
  • filter() — фильтрация элементов
  • flatMap() — трансформация с раскрытием вложенных структур
  • select(), withColumn() — операции со столбцами в DataFrames
  • take() — получение первых N элементов

Пример кода:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

df = spark.createDataFrame(
    [(1, "Alice", 1000), (2, "Bob", 2000), (3, "Charlie", 1500)],
    ["id", "name", "salary"]
)

# Узкая операция: map
rdd = df.rdd.map(lambda row: (row.name, row.salary * 1.1))

# Узкая операция: filter
filtered_df = df.filter(df.salary > 1500)

# Узкая операция: withColumn
increased_df = df.withColumn("bonus", df.salary * 0.1)

Преимущества:

  • Не требуют перемещения данных между узлами
  • Можно выполнять параллельно на локальном узле
  • Минимальные сетевые затраты
  • Быстрое выполнение

Широкие операции (Wide Transformations)

Широкие операции требуют перетасовки (shuffle) данных между партициями — это дорогостоящая операция с сетевой передачей данных.

Примеры широких операций:

  • groupByKey() — группировка по ключам
  • reduceByKey() — агрегация по ключам
  • join() — объединение двух RDD/DataFrames
  • repartition() — переразделение данных
  • distinct() — удаление дубликатов
  • sortByKey() — сортировка по ключам
  • aggregateByKey() — сложная агрегация

Пример кода:

# Широкая операция: groupByKey
grouped = df.groupBy("name").agg({"salary": "sum"})

# Широкая операция: join
df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
df2 = spark.createDataFrame([(1, 1000), (2, 2000)], ["id", "salary"])
joined = df1.join(df2, on="id")

# Широкая операция: repartition
repartitioned_df = df.repartition(10, "name")

# Широкая операция: distinct
distinct_names = df.select("name").distinct()

Недостатки:

  • Требуют shuffle — дорогостоящая операция
  • Данные передаются по сети между узлами
  • Потребление памяти растёт
  • Может привести к OutOfMemory если не настроить параметры

Когда Spark создаёт новые stages

Спарк создаёт новый stage для каждой широкой операции. Узкие операции объединяются в один stage для оптимизации.

DAG (Directed Acyclic Graph):

нарроу операции → stage 1 (без shuffle)
широкая операция (shuffle) → stage 2
нарроу операции → stage 2 (продолжение)
широкая операция → stage 3

Оптимизация и практические рекомендации

  1. Минимизируй широкие операции — они дороги. Используй узкие операции максимально.

  2. Порядок операций — фильтруй данные до shuffle:

# ✅ Хорошо: фильтр до группировки
df.filter(df.salary > 1000).groupBy("department").agg({"salary": "avg"})

# ❌ Плохо: группировка всех, потом фильтр
df.groupBy("department").agg({"salary": "avg"}).filter(F.col("avg(salary)") > 2000)
  1. Настройка параметров — для широких операций:
spark.conf.set("spark.shuffle.partitions", 200)  # Default: 200
spark.conf.set("spark.sql.shuffle.partitions", 200)
  1. Кэширование — кэшируй RDD перед широкими операциями:
df.cache()  # Кэшируем в памяти
df.groupBy("name").count()
df.groupBy("name").agg({"salary": "sum"})

Выводы

Узкие операции выполняются быстро в рамках одного stage без перемещения данных. Широкие операции требуют shuffle и новые stages, что замедляет обработку. Для оптимальной производительности Spark-приложения нужно понимать эти различия и минимизировать количество shuffle-операций.

В чем разница между narrow (узкими) и wide (широкими) операциями в Apache Spark? | PrepBro