← Назад к вопросам
Как диагностировать падение Spark-приложения после запуска?
2.4 Senior🔥 181 комментариев
#Apache Spark#Инструменты разработки
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Диагностика падения Spark-приложения
Падение Spark-приложения может быть вызвано множеством причин: нехватка памяти, ошибки в коде, проблемы с сетью, масштабирование. Систематический подход к диагностике — ключ к быстрому решению.
Этап 1: Анализ Spark UI
Первый шаг — открыть Spark UI (по умолчанию http://localhost:4040):
- Jobs tab — показывает статус выполняемых jobs. Красный флаг = упавший task
- Stages tab — детали по стадиям обработки данных
- Executors tab — состояние executors (в памяти, ошибки)
- SQL tab — для SQL запросов, план выполнения
- Logs tab — логи driver и executors
Этап 2: Проверка логов Driver
Логи содержат stack trace ошибки:
# Для локального запуска - логи в консоли
# Для YARN кластера
yarn logs -applicationId application_1234567890_0001
# Для Standalone кластера
cat /spark/logs/worker-logs/*
Этап 3: Типичные ошибки
OutOfMemoryError (OOM)
Нехватка памяти на executor или driver.
Диагностика:
spark-submit --executor-memory 4g --driver-memory 2g script.py
Решение:
# Увеличить количество партиций
df = df.repartition(200)
# Использовать broadcast для маленьких данных
from pyspark.sql.functions import broadcast
df_joined = df_large.join(broadcast(df_small), "key")
Ошибки сериализации
"Task not serializable" — невозможно отправить task на executor.
# ❌ Неправильно
obj = NonSerializable()
rdd = sc.parallelize([1,2,3]).map(lambda x: obj.process(x))
# ✅ Правильно
def process_function(x):
return x * 2
rdd = sc.parallelize([1,2,3]).map(process_function)
Deadlock при Shuffle
# Увеличить shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 1000)
# Включить Adaptive Query Execution (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
Этап 4: Команды мониторинга
# Проверить статус приложения
yarn application -status application_1234567890_0001
# Смотреть историю
# http://hostname:18080
Этап 5: Анализ плана выполнения
# Включить детальные логи
spark.sparkContext.setLogLevel("DEBUG")
# Объяснить план выполнения
df.explain(extended=True)
# Проверить время выполнения
df.count()
Систематический чеклист
Проблемы памяти:
- Увеличить executor memory
- Уменьшить размер партиций
- Использовать broadcast join
- Проверить garbage collection
Проблемы вычисления:
- Проверить сложность алгоритма
- Убедиться, filter выполняется до shuffle
- Использовать AQE
Проблемы данных:
- Проверить формат (Parquet vs CSV)
- Убедиться в корректности schema
- Использовать sampling при разработке
Проблемы сети:
- Проверить доступ между driver и executors
- Убедиться в bandwidth для shuffle
- Проверить конфигурацию портов
Пример диагностики
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
spark = SparkSession.builder \
.appName("DiagnosticApp") \
.config("spark.sql.shuffle.partitions", "500") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
try:
df = spark.read.parquet("/data/input")
print(f"Loaded {df.count()} rows")
df.groupBy("user_id").agg(count("*")).explain(extended=True)
result = df.groupBy("user_id").agg(count("*")).collect()
except Exception as e:
print(f"Error: {str(e)}")
import traceback
traceback.print_exc()
finally:
spark.stop()
Заключение
Систематическая диагностика требует проверки Spark UI, логов, конфигурации памяти и плана выполнения. Большинство проблем решаются оптимизацией памяти, партиционирования и использованием правильных стратегий JOIN.