← Назад к вопросам

Как диагностировать падение Spark-приложения после запуска?

2.4 Senior🔥 181 комментариев
#Apache Spark#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Диагностика падения Spark-приложения

Падение Spark-приложения может быть вызвано множеством причин: нехватка памяти, ошибки в коде, проблемы с сетью, масштабирование. Систематический подход к диагностике — ключ к быстрому решению.

Этап 1: Анализ Spark UI

Первый шаг — открыть Spark UI (по умолчанию http://localhost:4040):

  • Jobs tab — показывает статус выполняемых jobs. Красный флаг = упавший task
  • Stages tab — детали по стадиям обработки данных
  • Executors tab — состояние executors (в памяти, ошибки)
  • SQL tab — для SQL запросов, план выполнения
  • Logs tab — логи driver и executors

Этап 2: Проверка логов Driver

Логи содержат stack trace ошибки:

# Для локального запуска - логи в консоли

# Для YARN кластера
yarn logs -applicationId application_1234567890_0001

# Для Standalone кластера
cat /spark/logs/worker-logs/*

Этап 3: Типичные ошибки

OutOfMemoryError (OOM)

Нехватка памяти на executor или driver.

Диагностика:

spark-submit --executor-memory 4g --driver-memory 2g script.py

Решение:

# Увеличить количество партиций
df = df.repartition(200)

# Использовать broadcast для маленьких данных
from pyspark.sql.functions import broadcast
df_joined = df_large.join(broadcast(df_small), "key")

Ошибки сериализации

"Task not serializable" — невозможно отправить task на executor.

# ❌ Неправильно
obj = NonSerializable()
rdd = sc.parallelize([1,2,3]).map(lambda x: obj.process(x))

# ✅ Правильно
def process_function(x):
    return x * 2
rdd = sc.parallelize([1,2,3]).map(process_function)

Deadlock при Shuffle

# Увеличить shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 1000)

# Включить Adaptive Query Execution (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")

Этап 4: Команды мониторинга

# Проверить статус приложения
yarn application -status application_1234567890_0001

# Смотреть историю
# http://hostname:18080

Этап 5: Анализ плана выполнения

# Включить детальные логи
spark.sparkContext.setLogLevel("DEBUG")

# Объяснить план выполнения
df.explain(extended=True)

# Проверить время выполнения
df.count()

Систематический чеклист

Проблемы памяти:

  • Увеличить executor memory
  • Уменьшить размер партиций
  • Использовать broadcast join
  • Проверить garbage collection

Проблемы вычисления:

  • Проверить сложность алгоритма
  • Убедиться, filter выполняется до shuffle
  • Использовать AQE

Проблемы данных:

  • Проверить формат (Parquet vs CSV)
  • Убедиться в корректности schema
  • Использовать sampling при разработке

Проблемы сети:

  • Проверить доступ между driver и executors
  • Убедиться в bandwidth для shuffle
  • Проверить конфигурацию портов

Пример диагностики

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder \
    .appName("DiagnosticApp") \
    .config("spark.sql.shuffle.partitions", "500") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

try:
    df = spark.read.parquet("/data/input")
    print(f"Loaded {df.count()} rows")
    
    df.groupBy("user_id").agg(count("*")).explain(extended=True)
    result = df.groupBy("user_id").agg(count("*")).collect()
    
except Exception as e:
    print(f"Error: {str(e)}")
    import traceback
    traceback.print_exc()
finally:
    spark.stop()

Заключение

Систематическая диагностика требует проверки Spark UI, логов, конфигурации памяти и плана выполнения. Большинство проблем решаются оптимизацией памяти, партиционирования и использованием правильных стратегий JOIN.