← Назад к вопросам

Что происходит после отправки задания в Spark (spark-submit)?

2.3 Middle🔥 171 комментариев
#Apache Spark#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что происходит после spark-submit: полный lifecycle Spark job'а

Это один из самых важных вопросов для понимания Spark. Расскажу про каждый этап жизненного цикла job'а от submit'а до результата.

Общая архитектура Spark

Application (driver)
     ↓
SparkContext/SparkSession
     ↓
Cluster Manager (YARN, Kubernetes, Standalone)
     ↓
Executor nodes (where actual computation happens)

Фаза 1: Submission (spark-submit)

# Команда
spark-submit \
  --class com.example.MyApp \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 10 \
  --executor-memory 8g \
  --executor-cores 4 \
  my-app.jar arg1 arg2

Что происходит:

  1. spark-submit скрипт парсит аргументы
  2. Проверяет наличие JAR файла и зависимостей
  3. Определяет Cluster Manager (YARN, K8s, etc)
  4. Подготавливает конфигурацию (driver memory, executor count)
  5. Запускает Application (driver process) или подает заявку на кластер
# В Python (PySpark)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("yarn") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", 4) \
    .config("spark.executor.instances", 10) \
    .getOrCreate()

# SparkContext инициализируется
# -> Приложение подключается к Cluster Manager

Фаза 2: Driver Initialization

Driver Process:

  • Запускается на одной машине (или локально)
  • Это основной процесс вашего приложения
  • Выполняет код (преобразования DAG'ов в задачи)
  • Общается с Cluster Manager
# Код выполняется на Driver
df = spark.read.parquet("/data/input")
df2 = df.filter(df.amount > 100)  # Lazy evaluation!
df2.write.parquet("/data/output")  # Action! Trigger execution

# Трансформации (filter) не выполняются сразу
# Они добавляются в DAG (Directed Acyclic Graph)

DAG Construction:

Логические трансформации:
read(parquet)
  ↓
filter(amount > 100)
  ↓
write(parquet)

Это описание того, ЧТО нужно сделать, но не КАК

Фаза 3: DAG Scheduling

Когда вызывается Action (write, collect, show, count):

1. DAGScheduler анализирует DAG:

Large DAG
  ↓
DAGScheduler разбивает на Stages (по Shuffle границам)
  ↓
Cаждый Stage = набор задач, которые могут выполняться параллельно

Пример:

df = spark.read.parquet("/data")
df2 = df.filter(df.amount > 100)        # Stage 1 (map side)
df3 = df2.groupBy("category").count()   # Stage 2 (shuffle) + Stage 3 (reduce)
df3.write.parquet("/output")             # Stage 4 (write)
Stage 1: 
  ├─ Task 1 (partition 1): read + filter
  ├─ Task 2 (partition 2): read + filter
  └─ Task 3 (partition 3): read + filter

Shuffle barrier (данные переходят между executors)

Stage 2:
  ├─ Task 4: shuffle data
  ├─ Task 5: shuffle data
  └─ Task 6: shuffle data

Stage 3:
  ├─ Task 7: groupBy + count
  ├─ Task 8: groupBy + count
  └─ Task 9: groupBy + count

Фаза 4: Task Scheduling & Distribution

TaskScheduler (на Driver):

  1. Определяет какие задачи на каких executors запустить
  2. Учитывает data locality (где находятся данные)
  3. Распределяет по executors
# Data Locality (очень важно для performance!)
#
# Идеально: Task выполняется на executor'е, где данные
# Хорошо: Task выполняется на одной машине с данными
# OK: Task выполняется на другой машине (network transfer)

# Spark пытается минимизировать network I/O

Фаза 5: Executor Allocation

Cluster Manager (YARN, Kubernetes) выделяет ресурсы:

Driver: "Мне нужно 10 executors по 8GB памяти и 4 ядра"
        ↓
YARN/K8s: "OK, ищу machines с достаточными ресурсами"
        ↓
Выбирает nodes (например, машины 2, 5, 7, 9, 11)
        ↓
Запускает JVM процессы на каждой machine
        ↓
Executors стартуют и регистрируются на Driver
        ↓
Driver знает, что есть executors 1-10 ready

Executor Process:

# Executor — это JVM процесс, который:
# 1. Слушает команды от Driver
# 2. Выполняет tasks
# 3. Хранит cached данные в памяти
# 4. Отправляет результаты обратно Driver'у

Фаза 6: Task Execution

Для каждого Task:

1. Executor получает task от Driver
   └─ Task = часть Spark job'а
   └─ Имеет partition ID и stage ID

2. Executor выполняет операции
   └─ Читает данные из partition
   └─ Применяет трансформации (filter, map, etc)
   └─ Если нужен Shuffle — отправляет промежуточные данные

3. Результат:
   └─ Если конец stage — кэширует в памяти (для следующих stages)
   └─ Если конец job — отправляет результат Driver'у

Пример выполнения одного Task'а:

# Task: Прочитать parquet partition и применить filter

# Шаг 1: Прочитать данные
data = read_parquet("s3://bucket/data/part-000")

# Шаг 2: Применить операции
filtered = data.filter(lambda row: row['amount'] > 100)

# Шаг 3: Если нужен Shuffle — отправить промежуточный результат
# Если это конец Stage — поместить в память для следующей Stage

# Шаг 4: Отправить результат Driver'у или следующему Stage'у

Фаза 7: Shuffle (если необходимо)

Shuffle happens между Stage'ами когда нужны операции типа:

  • groupBy
  • join
  • distinct
  • etc (любая широкая трансформация)
# Пример Shuffle операции
df.groupBy("category").count()  # Нужно переместить все records одной категории на один executor

# Процесс:
Map phase:    каждый executor читает свою partition
              и выбирает records (category="A" идет на executor 1, "B" на executor 2, etc)
              результат пишется на диск (shuffle files)
                  ↓
Shuffle transfer: данные переходят между executors через network
                  ↓
Reduce phase: каждый executor получает свой набор categories
              и выполняет COUNT

Это самая дорогая операция! Shuffle требует:

  • Disk I/O (shuffle write)
  • Network I/O (transfer)
  • Disk I/O (shuffle read)

Затем Spark оптимизирует это с помощью:

  • Bucketing
  • Partitioning
  • Sorting

Фаза 8: Memory Management

Executor память разделена на:

┌─────────────────────────────────────┐
│ Total Executor Memory (8GB)         │
├─────────────────────────────────────┤
│ Reserve Memory (300MB)              │  ← Spark internal structures
├─────────────────────────────────────┤
│ Execution Memory (50%) = 3.85GB     │  ← For shuffle, joins
│ Storage Memory (50%) = 3.85GB       │  ← For caching, broadcast
└─────────────────────────────────────┘

Memory Pressure:

  • Если Execution memory переполнена → Spill on disk (медленно)
  • Если Storage memory переполнена → Evict from cache
# Optimize memory
spark.conf.set("spark.executor.memory", "16g")  # Больше памяти
spark.conf.set("spark.sql.shuffle.partitions", 200)  # Больше partitions = меньше memory per executor

Фаза 9: Monitoring & Progress

Driver отслеживает progress:

│ Spark UI (http://driver:4040)
│ ├─ Stages: 3/10 completed
│ ├─ Tasks: 125/1000 completed (75 failed, 23 running)
│ ├─ Executors: 8/10 active
│ └─ Data: 50GB processed, 15GB shuffled

Добавить свой мониторинг:

from pyspark.sql.functions import col

df = spark.read.parquet("/data")
df.count()  # This triggers execution
# Spark UI показывает прогресс в реальном времени

Фаза 10: Result Aggregation

После выполнения всех Stage'ов:

# Пример 1: write action
df.write.parquet("/output")  # Driver ждет, пока все executors запишут результаты
                             # Затем завершает job

# Пример 2: collect action
result = df.collect()  # Executors отправляют результаты Driver'у
                       # Driver собирает все в памяти
                       # (Осторожно! Может OOM если результаты большие)

# Пример 3: show action
df.show(5)  # Driver собирает первые 5 rows и выводит

Фаза 11: Cleanup & Completion

После завершения job'а:

1. Executors удаляют промежуточные данные (shuffle files)
2. Driver выводит summary
3. Логирует результат
4. Если job успешен → приложение продолжает
5. Если job failed → exception на Driver'е

Пример логов:

2026-03-26 10:30:45 INFO DAGScheduler: Job 1 finished: write
2026-03-26 10:30:45 INFO SQLExecution: Completed successfully
2026-03-26 10:30:46 INFO DAGScheduler: Job 2 finished: collect

Полная диаграмма выполнения

┌─────────────────────────────────────────────────────────┐
│ Фаза 1: spark-submit                                  │
│ Инициализация конфигурации                             │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 2: Driver Initialization                          │
│ SparkContext создается, DAG формируется                │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 3: DAG Scheduling                                │
│ DAGScheduler разбивает DAG на Stage'и                  │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 4: Task Scheduling                               │
│ TaskScheduler создает Task'и для каждого Stage'а       │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 5: Executor Allocation                           │
│ Cluster Manager выделяет ресурсы и запускает executors │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 6-9: Task Execution                              │
│ Executors выполняют tasks, shuffle, monitoring         │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 10: Result Aggregation                           │
│ Результаты собираются/пишутся                          │
└────────────────┬────────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 11: Cleanup                                      │
│ Executors очищают промежуточные данные                 │
│ Driver выводит summary и завершает приложение          │
└─────────────────────────────────────────────────────────┘

Оптимизация выполнения

# 1. Кэширование (если используется несколько раз)
df_frequent = spark.read.parquet("/data")
df_frequent.cache()  # Держим в памяти

result1 = df_frequent.filter(...).count()
result2 = df_frequent.join(...).count()  # Использует cached версию (быстро)

df_frequent.unpersist()  # Освобождаем память

# 2. Partitioning (уменьшаем shuffle)
df_partitioned = df.repartition(200, "date")  # Партиционируем по дате
# Следующие groupBy по дате будут быстрее

# 3. Broadcastng (для small tables в join)
from pyspark.sql.functions import broadcast

large_df.join(broadcast(small_df), "id")  # small_df отправляется всем executors
# Избегаем shuffle для small_df

Отладка и мониторинг

# Посмотреть план выполнения
df.explain(extended=True)  # Shows full physical plan

# Пример вывода:
# *(1) Broadcast HashJoin [...]
# :- *(1) Project [...]
# :  +- *(1) Filter (amount > 100)
# :     +- *(1) FileScan parquet [id, amount]

# Посмотреть Stage'и
spark.sparkContext.setLogLevel("WARN")
df.groupBy("category").count().show()  # Покажет info о stages

# Про Spark UI
# http://localhost:4040 (доступна пока job выполняется)
# http://localhost:4040/jobs/ (история job'ов)
# http://localhost:4040/stages/ (информация по stages)
# http://localhost:4040/executors/ (информация по executors)

Итог

Упрощённо, путь Spark job'а:

spark-submit → DAG construction → DAG scheduling → 
Task allocation → Execution on executors → 
Shuffle (if needed) → Result aggregation → Cleanup

Ключевые points:

  1. DAG оптимизация происходит перед выполнением
  2. Lazy evaluation — операции не выполняются сразу, ждут Action
  3. Stage separation по shuffle границам критична для performance
  4. Data locality важна для минимизации network I/O
  5. Memory management требует внимания при больших данных
  6. Monitoring через Spark UI помогает найти bottleneck'и

Это одна из самых важных вещей для опытного Spark engineer'а — понять, как Spark выполняет job'ы.

Что происходит после отправки задания в Spark (spark-submit)? | PrepBro