← Назад к вопросам

Как работает Catalyst Optimizer в Apache Spark?

2.0 Middle🔥 111 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Catalyst Optimizer в Apache Spark

Catalyst Optimizer — это оптимизатор запросов в Spark SQL, который преобразует SQL запросы и DataFrame операции в оптимизированный план выполнения. Это один из ключевых компонентов, делающих Spark SQL быстрым и эффективным.

Как работает Catalyst

Catalyst проходит четыре основные стадии:

1. PARSING (парсинг)
   SQL строка → Logical Plan
   
2. ANALYSIS (анализ)
   Логический план → Валидация и привязка к таблицам
   
3. OPTIMIZATION (оптимизация)
   Логический план → Оптимизированный логический план
   
4. PHYSICAL PLANNING (физическое планирование)
   Оптимизированный логический план → Physical Plans (несколько вариантов)
   
5. EXECUTION (выполнение)
   Лучший Physical Plan → Машинный код → Результат

Пример преобразования

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# SQL запрос
query = """
SELECT 
    d.department_name,
    AVG(e.salary) as avg_salary
FROM employees e
JOIN departments d ON e.dept_id = d.dept_id
WHERE e.salary > 50000
GROUP BY d.department_name
HAVING AVG(e.salary) > 60000
"""

df = spark.sql(query)

# Смотрим логический план
df.explain(mode="simple")

# Output:
# == Logical Plan ==
# Aggregate [department_name#10], [department_name#10, avg(salary#5) AS avg_salary#20]
# Filter (avg(salary#5) > 60000)
#   Join Inner, (dept_id#2 = dept_id#9)
#     Filter (salary#5 > 50000)
#       Scan parquet [employees]
#     Scan parquet [departments]

# После оптимизации:
df.explain(mode="extended")

# Output:
# == Optimized Logical Plan ==
# Aggregate [department_name#10], [department_name#10, avg(salary#5) AS avg_salary#20]
# Filter (avg(salary#5) > 60000)
#   Join Inner, (dept_id#2 = dept_id#9)
#     Filter (salary#5 > 50000)  ← pushed down!
#       Scan parquet [employees]
#     Scan parquet [departments]

# Физический план:
df.explain(mode="physical")

# Output:
# == Physical Plan ==
# ... RDD операции и Spark executors ...

Основные оптимизации Catalyst

1. Predicate Pushdown (Продвижение фильтров вниз)

Фильтр применяется как можно раньше, чтобы сократить объём обрабатываемых данных.

# Неоптимизированный запрос
df = spark.read.parquet("employees")
df = df.join(spark.read.parquet("departments"), "dept_id")
df_filtered = df.filter(col("salary") > 50000)  # фильтр ПОСЛЕ JOIN

# Catalyst оптимизирует в:
df = spark.read.parquet("employees")
df_filtered = df.filter(col("salary") > 50000)  # фильтр ПЕРЕД JOIN
df = df_filtered.join(spark.read.parquet("departments"), "dept_id")

# Результат:
# Вместо обработки 1 млн + 10k строк → обработка 500k + 10k строк

2. Projection Pushdown (Отбор только нужных колонок)

Esли тебе нужны только определённые колонки, Spark читает только их из источника.

# Неоффективный запрос
df = spark.read.parquet("employees_with_100_columns")
df_result = df.select("name", "salary")

# Catalyst оптимизирует:
# Вместо чтения всех 100 колонок → читает только 2 колонки
# Ускорение: в 50+ раз на больших файлах

3. Constant Folding (Вычисление констант)

Выражения с константами вычисляются один раз при планировании, а не при выполнении.

# Неоптимизированный
df = df.filter((col("salary") * (1 + 0.1)) > 50000)

# Catalyst оптимизирует:
# (1 + 0.1) → 1.1 (вычислено один раз)
df = df.filter((col("salary") * 1.1) > 50000)

4. Boolean Simplification (Упрощение логики)

Упрощение сложных условий.

# Неоптимизированный
df.filter((col("status") == "active") | (col("status") == "active"))

# Catalyst оптимизирует:
df.filter(col("status") == "active")

# Другой пример
df.filter((col("age") > 0) & (col("age") < 120) & (col("age") != 0))
# → (col("age") > 0) & (col("age") < 120)

5. Operator Reordering (Переупорядочение операторов)

Catalyst меняет порядок JOIN для минимизации объёма обрабатываемых данных.

# Исходный запрос
Small_table.join(Large_table_1, "id").join(Large_table_2, "id")

# Catalyst оптимизирует порядок JOIN:
# Small_table → уменьшает результат
# → JOIN с Large_table_2 (меньше строк)
# → JOIN с Large_table_1

# Вместо: S × L1 × L2 = миллиарды операций
# Получаем: (S × L2) × L1 = миллионы операций

6. Column Pruning (Исключение лишних колонок)

df = spark.read.parquet("sales")
df = df.select("customer_id", "amount")
df_result = df.groupBy("customer_id").sum("amount")

# Catalyst понимает:
# Для результата нужны только customer_id и amount
# → читает только эти две колонки из Parquet
# Вместо чтения всех 50 колонок

Правила оптимизации (Rules)

Catalyst использует набор правил трансформации, которые применяются рекурсивно:

# Пример правила: EliminateSubqueryAliases
# Правило: если подзапрос используется только один раз, раскрыть его

SELECT * FROM (
    SELECT * FROM employees WHERE salary > 50000
) AS e  # ← подзапрос

# Catalyst применяет правило:
SELECT * FROM employees WHERE salary > 50000  # подзапрос раскрыт

Как использовать Catalyst оптимально

1. Используй SQL вместо DataFrame API

# SQL (Catalyst оптимизирует)
df = spark.sql("""
    SELECT customer_id, SUM(amount)
    FROM sales
    WHERE year = 2024
    GROUP BY customer_id
""")

# DataFrame API (Catalyst тоже оптимизирует, но реже)
df = spark.read.parquet("sales")
df = df.filter(col("year") == 2024)
df = df.groupBy("customer_id").agg(sum("amount"))

# Оба работают, но SQL часто быстрее

2. Фильтруй как можно раньше

# ❌ Плохо: читаем всё, потом фильтруем
df = spark.read.parquet("large_table")  # 1 млн строк
df = df.filter(col("year") == 2024)  # 100k строк

# ✅ Хорошо: Catalyst оптимизирует этот код!
df = spark.read.parquet("large_table")
df = df.filter(col("year") == 2024)

# ✅ Отличное решение: используй SQL с WHERE
df = spark.sql("""
    SELECT * FROM large_table WHERE year = 2024
""")

3. Избегай лишних операций

# ❌ Плохо: лишний select
df = df.select("*")
df = df.select("name", "salary")

# ✅ Хорошо: один select
df = df.select("name", "salary")

4. Используй Parquet и ORC (Catalyst читает только нужные колонки)

# ✅ Хорошо: Parquet поддерживает column pruning
df = spark.read.parquet("data.parquet")
df = df.select("id", "name")  # читает только 2 колонки

# ❌ Плохо: CSV нужно прочитать целиком
df = spark.read.csv("data.csv")
df = df.select("id", "name")  # всё равно прочитались все колонки

Мониторинг оптимизации

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Включи логирование Catalyst
spark.sparkContext.setLogLevel("DEBUG")

df = spark.sql("SELECT * FROM employees WHERE salary > 50000")

# В логах будет:
# == Parsed Logical Plan ==
# == Analyzed Logical Plan ==
# == Optimized Logical Plan ==  ← видимо оптимизацию
# == Physical Plan ==

# Или программно:
df.explain(extended=True)  # показывает все этапы

Когда Catalyst МОЖЕТ быть медленнее

# 1. Очень сложные вложенные запросы
df = spark.sql("""
    SELECT * FROM (
        SELECT * FROM (
            SELECT * FROM (
                SELECT * FROM table WHERE x > 5
            ) WHERE y > 10
        ) WHERE z > 15
    )
""")
# Catalyst потратит время на раскрытие скобок

# 2. Пользовательские функции (UDF)
# Catalyst не может оптимизировать внутри UDF

@udf(returnType=IntegerType())
def my_udf(x):
    return x * 2

df = df.withColumn("result", my_udf(col("value")))
# Catalyst не может упростить my_udf

# Решение: используй встроенные функции вместо UDF
df = df.withColumn("result", col("value") * 2)  # Catalyst оптимизирует

Best Practices

  1. Используй SQL вместо DataFrame API
  2. Фильтруй рано (WHERE перед JOIN)
  3. Выбирай нужные колонки (SELECT, не SELECT *)
  4. Используй Parquet/ORC (column pushdown)
  5. Избегай UDF (Catalyst не может их оптимизировать)
  6. Проверяй план (explain() часто)
  7. Обновляй статистику (ANALYZE TABLE)

Заключение

Catalyst Optimizer — это автоматический оптимизатор, который:

  • Преобразует запрос в наиболее эффективный физический план
  • Применяет десятки правил оптимизации
  • Работает лучше с SQL, чем с DataFrame API
  • Может быть понижен вручную (если ты знаешь лучше)

Понимание Catalyst — критично для написания быстрых Spark приложений.

Как работает Catalyst Optimizer в Apache Spark? | PrepBro