Что происходит после отправки задания в Spark (spark-submit)?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что происходит после spark-submit: полный lifecycle Spark job'а
Это один из самых важных вопросов для понимания Spark. Расскажу про каждый этап жизненного цикла job'а от submit'а до результата.
Общая архитектура Spark
Application (driver)
↓
SparkContext/SparkSession
↓
Cluster Manager (YARN, Kubernetes, Standalone)
↓
Executor nodes (where actual computation happens)
Фаза 1: Submission (spark-submit)
# Команда
spark-submit \
--class com.example.MyApp \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 8g \
--executor-cores 4 \
my-app.jar arg1 arg2
Что происходит:
- spark-submit скрипт парсит аргументы
- Проверяет наличие JAR файла и зависимостей
- Определяет Cluster Manager (YARN, K8s, etc)
- Подготавливает конфигурацию (driver memory, executor count)
- Запускает Application (driver process) или подает заявку на кластер
# В Python (PySpark)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.master("yarn") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", 4) \
.config("spark.executor.instances", 10) \
.getOrCreate()
# SparkContext инициализируется
# -> Приложение подключается к Cluster Manager
Фаза 2: Driver Initialization
Driver Process:
- Запускается на одной машине (или локально)
- Это основной процесс вашего приложения
- Выполняет код (преобразования DAG'ов в задачи)
- Общается с Cluster Manager
# Код выполняется на Driver
df = spark.read.parquet("/data/input")
df2 = df.filter(df.amount > 100) # Lazy evaluation!
df2.write.parquet("/data/output") # Action! Trigger execution
# Трансформации (filter) не выполняются сразу
# Они добавляются в DAG (Directed Acyclic Graph)
DAG Construction:
Логические трансформации:
read(parquet)
↓
filter(amount > 100)
↓
write(parquet)
Это описание того, ЧТО нужно сделать, но не КАК
Фаза 3: DAG Scheduling
Когда вызывается Action (write, collect, show, count):
1. DAGScheduler анализирует DAG:
Large DAG
↓
DAGScheduler разбивает на Stages (по Shuffle границам)
↓
Cаждый Stage = набор задач, которые могут выполняться параллельно
Пример:
df = spark.read.parquet("/data")
df2 = df.filter(df.amount > 100) # Stage 1 (map side)
df3 = df2.groupBy("category").count() # Stage 2 (shuffle) + Stage 3 (reduce)
df3.write.parquet("/output") # Stage 4 (write)
Stage 1:
├─ Task 1 (partition 1): read + filter
├─ Task 2 (partition 2): read + filter
└─ Task 3 (partition 3): read + filter
Shuffle barrier (данные переходят между executors)
Stage 2:
├─ Task 4: shuffle data
├─ Task 5: shuffle data
└─ Task 6: shuffle data
Stage 3:
├─ Task 7: groupBy + count
├─ Task 8: groupBy + count
└─ Task 9: groupBy + count
Фаза 4: Task Scheduling & Distribution
TaskScheduler (на Driver):
- Определяет какие задачи на каких executors запустить
- Учитывает data locality (где находятся данные)
- Распределяет по executors
# Data Locality (очень важно для performance!)
#
# Идеально: Task выполняется на executor'е, где данные
# Хорошо: Task выполняется на одной машине с данными
# OK: Task выполняется на другой машине (network transfer)
# Spark пытается минимизировать network I/O
Фаза 5: Executor Allocation
Cluster Manager (YARN, Kubernetes) выделяет ресурсы:
Driver: "Мне нужно 10 executors по 8GB памяти и 4 ядра"
↓
YARN/K8s: "OK, ищу machines с достаточными ресурсами"
↓
Выбирает nodes (например, машины 2, 5, 7, 9, 11)
↓
Запускает JVM процессы на каждой machine
↓
Executors стартуют и регистрируются на Driver
↓
Driver знает, что есть executors 1-10 ready
Executor Process:
# Executor — это JVM процесс, который:
# 1. Слушает команды от Driver
# 2. Выполняет tasks
# 3. Хранит cached данные в памяти
# 4. Отправляет результаты обратно Driver'у
Фаза 6: Task Execution
Для каждого Task:
1. Executor получает task от Driver
└─ Task = часть Spark job'а
└─ Имеет partition ID и stage ID
2. Executor выполняет операции
└─ Читает данные из partition
└─ Применяет трансформации (filter, map, etc)
└─ Если нужен Shuffle — отправляет промежуточные данные
3. Результат:
└─ Если конец stage — кэширует в памяти (для следующих stages)
└─ Если конец job — отправляет результат Driver'у
Пример выполнения одного Task'а:
# Task: Прочитать parquet partition и применить filter
# Шаг 1: Прочитать данные
data = read_parquet("s3://bucket/data/part-000")
# Шаг 2: Применить операции
filtered = data.filter(lambda row: row['amount'] > 100)
# Шаг 3: Если нужен Shuffle — отправить промежуточный результат
# Если это конец Stage — поместить в память для следующей Stage
# Шаг 4: Отправить результат Driver'у или следующему Stage'у
Фаза 7: Shuffle (если необходимо)
Shuffle happens между Stage'ами когда нужны операции типа:
- groupBy
- join
- distinct
- etc (любая широкая трансформация)
# Пример Shuffle операции
df.groupBy("category").count() # Нужно переместить все records одной категории на один executor
# Процесс:
Map phase: каждый executor читает свою partition
и выбирает records (category="A" идет на executor 1, "B" на executor 2, etc)
результат пишется на диск (shuffle files)
↓
Shuffle transfer: данные переходят между executors через network
↓
Reduce phase: каждый executor получает свой набор categories
и выполняет COUNT
Это самая дорогая операция! Shuffle требует:
- Disk I/O (shuffle write)
- Network I/O (transfer)
- Disk I/O (shuffle read)
Затем Spark оптимизирует это с помощью:
- Bucketing
- Partitioning
- Sorting
Фаза 8: Memory Management
Executor память разделена на:
┌─────────────────────────────────────┐
│ Total Executor Memory (8GB) │
├─────────────────────────────────────┤
│ Reserve Memory (300MB) │ ← Spark internal structures
├─────────────────────────────────────┤
│ Execution Memory (50%) = 3.85GB │ ← For shuffle, joins
│ Storage Memory (50%) = 3.85GB │ ← For caching, broadcast
└─────────────────────────────────────┘
Memory Pressure:
- Если Execution memory переполнена → Spill on disk (медленно)
- Если Storage memory переполнена → Evict from cache
# Optimize memory
spark.conf.set("spark.executor.memory", "16g") # Больше памяти
spark.conf.set("spark.sql.shuffle.partitions", 200) # Больше partitions = меньше memory per executor
Фаза 9: Monitoring & Progress
Driver отслеживает progress:
│ Spark UI (http://driver:4040)
│ ├─ Stages: 3/10 completed
│ ├─ Tasks: 125/1000 completed (75 failed, 23 running)
│ ├─ Executors: 8/10 active
│ └─ Data: 50GB processed, 15GB shuffled
Добавить свой мониторинг:
from pyspark.sql.functions import col
df = spark.read.parquet("/data")
df.count() # This triggers execution
# Spark UI показывает прогресс в реальном времени
Фаза 10: Result Aggregation
После выполнения всех Stage'ов:
# Пример 1: write action
df.write.parquet("/output") # Driver ждет, пока все executors запишут результаты
# Затем завершает job
# Пример 2: collect action
result = df.collect() # Executors отправляют результаты Driver'у
# Driver собирает все в памяти
# (Осторожно! Может OOM если результаты большие)
# Пример 3: show action
df.show(5) # Driver собирает первые 5 rows и выводит
Фаза 11: Cleanup & Completion
После завершения job'а:
1. Executors удаляют промежуточные данные (shuffle files)
2. Driver выводит summary
3. Логирует результат
4. Если job успешен → приложение продолжает
5. Если job failed → exception на Driver'е
Пример логов:
2026-03-26 10:30:45 INFO DAGScheduler: Job 1 finished: write
2026-03-26 10:30:45 INFO SQLExecution: Completed successfully
2026-03-26 10:30:46 INFO DAGScheduler: Job 2 finished: collect
Полная диаграмма выполнения
┌─────────────────────────────────────────────────────────┐
│ Фаза 1: spark-submit │
│ Инициализация конфигурации │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 2: Driver Initialization │
│ SparkContext создается, DAG формируется │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 3: DAG Scheduling │
│ DAGScheduler разбивает DAG на Stage'и │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 4: Task Scheduling │
│ TaskScheduler создает Task'и для каждого Stage'а │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 5: Executor Allocation │
│ Cluster Manager выделяет ресурсы и запускает executors │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 6-9: Task Execution │
│ Executors выполняют tasks, shuffle, monitoring │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 10: Result Aggregation │
│ Результаты собираются/пишутся │
└────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 11: Cleanup │
│ Executors очищают промежуточные данные │
│ Driver выводит summary и завершает приложение │
└─────────────────────────────────────────────────────────┘
Оптимизация выполнения
# 1. Кэширование (если используется несколько раз)
df_frequent = spark.read.parquet("/data")
df_frequent.cache() # Держим в памяти
result1 = df_frequent.filter(...).count()
result2 = df_frequent.join(...).count() # Использует cached версию (быстро)
df_frequent.unpersist() # Освобождаем память
# 2. Partitioning (уменьшаем shuffle)
df_partitioned = df.repartition(200, "date") # Партиционируем по дате
# Следующие groupBy по дате будут быстрее
# 3. Broadcastng (для small tables в join)
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "id") # small_df отправляется всем executors
# Избегаем shuffle для small_df
Отладка и мониторинг
# Посмотреть план выполнения
df.explain(extended=True) # Shows full physical plan
# Пример вывода:
# *(1) Broadcast HashJoin [...]
# :- *(1) Project [...]
# : +- *(1) Filter (amount > 100)
# : +- *(1) FileScan parquet [id, amount]
# Посмотреть Stage'и
spark.sparkContext.setLogLevel("WARN")
df.groupBy("category").count().show() # Покажет info о stages
# Про Spark UI
# http://localhost:4040 (доступна пока job выполняется)
# http://localhost:4040/jobs/ (история job'ов)
# http://localhost:4040/stages/ (информация по stages)
# http://localhost:4040/executors/ (информация по executors)
Итог
Упрощённо, путь Spark job'а:
spark-submit → DAG construction → DAG scheduling →
Task allocation → Execution on executors →
Shuffle (if needed) → Result aggregation → Cleanup
Ключевые points:
- DAG оптимизация происходит перед выполнением
- Lazy evaluation — операции не выполняются сразу, ждут Action
- Stage separation по shuffle границам критична для performance
- Data locality важна для минимизации network I/O
- Memory management требует внимания при больших данных
- Monitoring через Spark UI помогает найти bottleneck'и
Это одна из самых важных вещей для опытного Spark engineer'а — понять, как Spark выполняет job'ы.