Как смотрел план составленный в Spark?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Анализ плана выполнения в Apache Spark
План выполнения (Execution Plan) в Spark — это критически важный инструмент для оптимизации больших данных. Он показывает, как Spark будет обрабатывать ваш запрос, и позволяет выявить узкие места в производительности.
Типы планов в Spark
Logical Plan
Логический план описывает что должно быть сделано:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
df = spark.read.csv("/path/to/data.csv", header=True, inferSchema=True)
df2 = df.filter(df.age > 25).groupBy("city").count()
# Просмотр логического плана
df2.explain(mode="simple") # Простой вывод
df2.explain(mode="extended") # Расширенный с оптимизациями
df2.explain(mode="codegen") # С информацией о генерировании кода
Вывод покажет:
== Parsed Logical Plan ==
Aggregate [city#12], [city#12, count(1) AS count#18L]
+- Filter (age#11 > 25)
+- Relation[name#10,age#11,city#12] csv
Physical Plan
Физический план показывает как именно будут выполняться операции:
df2.explain(mode="extended")
Вывод содержит:
== Physical Plan ==
AdaptiveSparkPlan isFinal=false
+- HashAggregate (final stage)
+- HashAggregate (partial)
+- Filter (age > 25)
+- FileScan csv [name,age,city]
Как читать план выполнения
1. Структура дерева
Читается снизу вверх:
┌─ CollectLimit ← Вывод результата
├─ Sort ← Сортировка
├─ Exchange ← Перемешивание по сети (SHUFFLE)
├─ HashAggregate ← Агрегирование
└─ Filter ← Фильтрация
└─ Scan ← Чтение данных
2. Ключевые операции
# Пример с подробным анализом плана
df = spark.read.csv("/path/to/large_data.csv", header=True, inferSchema=True)
# Проблемный запрос
result = (df
.filter(df.salary > 50000)
.groupBy("department")
.agg({"salary": "avg"})
.sort("avg(salary)", ascending=False)
)
result.explain(mode="extended")
Оптимизация на основе плана
1. Выявление SHUFFLE операций
SHUFFLE — самая дорогая операция (обмен данными по сети):
# Плохой запрос — много shuffle
df1 = spark.read.csv("file1.csv", header=True)
df2 = spark.read.csv("file2.csv", header=True)
result = (df1
.groupBy("id").agg({"value": "sum"})
.join(df2, on="id")
.groupBy("category").agg({"value": "avg"})
)
result.explain() # Будет видно несколько Exchange операций
2. Проверка Predicate Pushdown
Оптимизация должна пушить фильтры вниз (к чтению данных):
# Хороший запрос
df = spark.read.csv("/path/to/data.csv", header=True)
# Фильтр будет применен ДО group by
result = (df
.filter(df.age > 25) # Pushdown — применится при чтении
.groupBy("city").count()
)
result.explain()
# Плохой запрос
result = (df
.groupBy("city").agg({"age": "avg"})
.filter(F.col("avg(age)") > 25) # Фильтр на агрегированные данные
)
result.explain() # Фильтр не может быть pushed down
Практический пример анализа
from pyspark.sql import functions as F
# Загрузка данных
users = spark.read.csv("users.csv", header=True, inferSchema=True)
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
# Проблемный запрос
query = (users
.join(orders, on="user_id") # Join может быть дорогим
.filter(orders.amount > 1000)
.groupBy(users.city)
.agg(F.avg(orders.amount).alias("avg_order"))
.sort("avg_order", ascending=False)
.limit(10)
)
print("\n=== LOGICAL PLAN ===")
query.explain(mode="simple")
print("\n=== PHYSICAL PLAN ===")
query.explain(mode="extended")
Вывод может показать:
BroadcastHashJoin [user_id] ← Если таблица orders маленькая
SortMergeJoin [user_id] ← Если обе таблицы большие
Оптимизация join операций
# Если одна таблица маленькая, использовать broadcast join
users = spark.read.csv("users.csv", header=True, inferSchema=True)
cities = spark.read.csv("cities.csv", header=True, inferSchema=True) # Маленькая
result = users.join(
F.broadcast(cities), # Явно указываем broadcast
on="city_id"
)
result.explain() # Покажет BroadcastHashJoin вместо SortMergeJoin
Инструменты для анализа
Spark UI
Доступен по адресу http://localhost:4040 (или другой порт):
# Запуск Spark приложения
spark.sparkContext.setLogLevel("INFO")
df.write.parquet("output.parquet")
# Откройте Spark UI и посмотрите вкладки:
# - Jobs: общая статистика
# - Stages: разбор по этапам
# - Storage: кэширование данных
# - Environment: конфигурация кластера
DAG (Directed Acyclic Graph) Visualization
Визуализация зависимостей операций:
rdd = spark.sparkContext.parallelize(range(1000))
rdd2 = rdd.filter(lambda x: x % 2 == 0).map(lambda x: x * 2)
rdd2.collect()
# В Spark UI есть раздел "DAG Visualization"
Кэширование для оптимизации
df = spark.read.csv("large_file.csv", header=True, inferSchema=True)
# Если df используется несколько раз, кэшируем
df.cache() # MEMORY storage level
df.persist() # То же самое
# После кэширования запросы быстрее
df.filter(df.age > 25).count() # Первый запрос медленнее
df.filter(df.age > 30).count() # Второй запрос быстрее (из кэша)
# Удаление из кэша
df.unpersist()
Хорошие практики
- Всегда смотрите план выполнения перед запуском тяжелых операций
- Избегайте скрытых shuffle операций — join, groupBy, repartition
- Используйте broadcast для маленьких таблиц (< 100 MB обычно)
- Пушьте фильтры как можно раньше — это уменьшает объем обрабатываемых данных
- Кэшируйте промежуточные результаты, если они используются несколько раз
- Мониторьте Spark UI — видно где тратится время
- Партиционируйте данные правильно — меньше shuffle операций
Пример оптимизированного запроса
# Неоптимизированный
result = (df
.join(df2, on="id")
.groupBy("city").count()
.filter(F.col("count") > 100)
)
# Оптимизированный
result = (df
.filter(df.valid == True) # Pushdown
.join(F.broadcast(df2), on="id") # Broadcast join
.groupBy("city").count()
.filter(F.col("count") > 100) # На агрегированных данных
)
Понимание плана выполнения позволяет писать эффективные Spark приложения, которые обрабатывают петабайты данных за минуты вместо часов.