← Назад к вопросам

Как смотрел план составленный в Spark?

2.2 Middle🔥 61 комментариев
#Big Data и распределенные вычисления

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Анализ плана выполнения в Apache Spark

План выполнения (Execution Plan) в Spark — это критически важный инструмент для оптимизации больших данных. Он показывает, как Spark будет обрабатывать ваш запрос, и позволяет выявить узкие места в производительности.

Типы планов в Spark

Logical Plan

Логический план описывает что должно быть сделано:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

df = spark.read.csv("/path/to/data.csv", header=True, inferSchema=True)
df2 = df.filter(df.age > 25).groupBy("city").count()

# Просмотр логического плана
df2.explain(mode="simple")  # Простой вывод
df2.explain(mode="extended")  # Расширенный с оптимизациями
df2.explain(mode="codegen")  # С информацией о генерировании кода

Вывод покажет:

== Parsed Logical Plan ==
Aggregate [city#12], [city#12, count(1) AS count#18L]
+- Filter (age#11 > 25)
   +- Relation[name#10,age#11,city#12] csv

Physical Plan

Физический план показывает как именно будут выполняться операции:

df2.explain(mode="extended")

Вывод содержит:

== Physical Plan ==
AdaptiveSparkPlan isFinal=false
+- HashAggregate (final stage)
   +- HashAggregate (partial)
      +- Filter (age > 25)
         +- FileScan csv [name,age,city]

Как читать план выполнения

1. Структура дерева

Читается снизу вверх:

┌─ CollectLimit          ← Вывод результата
├─ Sort                  ← Сортировка
├─ Exchange              ← Перемешивание по сети (SHUFFLE)
├─ HashAggregate        ← Агрегирование
└─ Filter               ← Фильтрация
   └─ Scan              ← Чтение данных

2. Ключевые операции

# Пример с подробным анализом плана
df = spark.read.csv("/path/to/large_data.csv", header=True, inferSchema=True)

# Проблемный запрос
result = (df
    .filter(df.salary > 50000)
    .groupBy("department")
    .agg({"salary": "avg"})
    .sort("avg(salary)", ascending=False)
)

result.explain(mode="extended")

Оптимизация на основе плана

1. Выявление SHUFFLE операций

SHUFFLE — самая дорогая операция (обмен данными по сети):

# Плохой запрос — много shuffle
df1 = spark.read.csv("file1.csv", header=True)
df2 = spark.read.csv("file2.csv", header=True)

result = (df1
    .groupBy("id").agg({"value": "sum"})
    .join(df2, on="id")
    .groupBy("category").agg({"value": "avg"})
)
result.explain()  # Будет видно несколько Exchange операций

2. Проверка Predicate Pushdown

Оптимизация должна пушить фильтры вниз (к чтению данных):

# Хороший запрос
df = spark.read.csv("/path/to/data.csv", header=True)

# Фильтр будет применен ДО group by
result = (df
    .filter(df.age > 25)      # Pushdown — применится при чтении
    .groupBy("city").count()
)
result.explain()

# Плохой запрос
result = (df
    .groupBy("city").agg({"age": "avg"})
    .filter(F.col("avg(age)") > 25)  # Фильтр на агрегированные данные
)
result.explain()  # Фильтр не может быть pushed down

Практический пример анализа

from pyspark.sql import functions as F

# Загрузка данных
users = spark.read.csv("users.csv", header=True, inferSchema=True)
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)

# Проблемный запрос
query = (users
    .join(orders, on="user_id")  # Join может быть дорогим
    .filter(orders.amount > 1000)
    .groupBy(users.city)
    .agg(F.avg(orders.amount).alias("avg_order"))
    .sort("avg_order", ascending=False)
    .limit(10)
)

print("\n=== LOGICAL PLAN ===")
query.explain(mode="simple")

print("\n=== PHYSICAL PLAN ===")
query.explain(mode="extended")

Вывод может показать:

BroadcastHashJoin [user_id]  ← Если таблица orders маленькая
SortMergeJoin [user_id]      ← Если обе таблицы большие

Оптимизация join операций

# Если одна таблица маленькая, использовать broadcast join
users = spark.read.csv("users.csv", header=True, inferSchema=True)
cities = spark.read.csv("cities.csv", header=True, inferSchema=True)  # Маленькая

result = users.join(
    F.broadcast(cities),  # Явно указываем broadcast
    on="city_id"
)
result.explain()  # Покажет BroadcastHashJoin вместо SortMergeJoin

Инструменты для анализа

Spark UI

Доступен по адресу http://localhost:4040 (или другой порт):

# Запуск Spark приложения
spark.sparkContext.setLogLevel("INFO")
df.write.parquet("output.parquet")

# Откройте Spark UI и посмотрите вкладки:
# - Jobs: общая статистика
# - Stages: разбор по этапам
# - Storage: кэширование данных
# - Environment: конфигурация кластера

DAG (Directed Acyclic Graph) Visualization

Визуализация зависимостей операций:

rdd = spark.sparkContext.parallelize(range(1000))
rdd2 = rdd.filter(lambda x: x % 2 == 0).map(lambda x: x * 2)
rdd2.collect()

# В Spark UI есть раздел "DAG Visualization"

Кэширование для оптимизации

df = spark.read.csv("large_file.csv", header=True, inferSchema=True)

# Если df используется несколько раз, кэшируем
df.cache()  # MEMORY storage level
df.persist()  # То же самое

# После кэширования запросы быстрее
df.filter(df.age > 25).count()     # Первый запрос медленнее
df.filter(df.age > 30).count()     # Второй запрос быстрее (из кэша)

# Удаление из кэша
df.unpersist()

Хорошие практики

  1. Всегда смотрите план выполнения перед запуском тяжелых операций
  2. Избегайте скрытых shuffle операций — join, groupBy, repartition
  3. Используйте broadcast для маленьких таблиц (< 100 MB обычно)
  4. Пушьте фильтры как можно раньше — это уменьшает объем обрабатываемых данных
  5. Кэшируйте промежуточные результаты, если они используются несколько раз
  6. Мониторьте Spark UI — видно где тратится время
  7. Партиционируйте данные правильно — меньше shuffle операций

Пример оптимизированного запроса

# Неоптимизированный
result = (df
    .join(df2, on="id")
    .groupBy("city").count()
    .filter(F.col("count") > 100)
)

# Оптимизированный
result = (df
    .filter(df.valid == True)          # Pushdown
    .join(F.broadcast(df2), on="id")   # Broadcast join
    .groupBy("city").count()
    .filter(F.col("count") > 100)      # На агрегированных данных
)

Понимание плана выполнения позволяет писать эффективные Spark приложения, которые обрабатывают петабайты данных за минуты вместо часов.