← Назад к вопросам

Расскажите о стратегиях оптимизации Spark-задач.

2.0 Middle🔥 111 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Стратегии оптимизации Spark-задач

Apache Spark — это распределённый фреймворк для обработки больших данных. Оптимизация задач критична для скорости выполнения и затрат на вычисления.

1. Партиционирование и Shuffle

Parallelize работы правильное число партиций:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("OptimizedJob").getOrCreate()

# Слишком мало партиций — медленно
df = spark.read.parquet("s3://bucket/data").coalesce(1)

# Правильное число партиций (обычно 200)
df = spark.read.parquet("s3://bucket/data").repartition(200)

# Узнать оптимальное число
num_partitions = spark.sparkContext.defaultParallelism
print(f"Optimal partitions: {num_partitions}")

2. Broadcast переменные

Для JOIN маленьких таблиц используй broadcast вместо shuffle:

from pyspark.sql.functions import broadcast

# Плохо — Shuffle Join
result = large_df.join(small_df, "key")

# Хорошо — Broadcast Join
result = large_df.join(
    broadcast(small_df),
    "key"
)

3. Кеширование (Caching)

Кеши DataFrame если используешь его несколько раз:

df = spark.read.parquet("s3://bucket/data")

# Кешировать в памяти
df.cache()  # или df.persist()

# Использовать несколько раз
count1 = df.count()
filtered = df.filter(df.value > 100)
count2 = filtered.count()

# Освободить память
df.unpersist()

4. Выборочная загрузка колонок (Projection Pushdown)

# Плохо — загружаются все колонки
df = spark.read.parquet("s3://bucket/data")
result = df.select("name", "age")

# Хорошо — Parquet читает только нужные колонки
df = spark.read.parquet("s3://bucket/data").select("name", "age")

5. Фильтрация ранняя (Predicate Pushdown)

# Плохо — фильтруется после загрузки
df = spark.read.parquet("s3://bucket/data")
result = df.filter(df.country == "RU")

# Хорошо — фильтр применяется при чтении
df = spark.read.parquet(
    "s3://bucket/data",
    # Parquet использует partition info для pruning
).filter(df.country == "RU")

6. Оптимизация Join операций

# Сортировка перед JOIN для лучшего performance
df1 = spark.read.parquet("s3://bucket/table1").repartition("key").sortWithinPartitions("key")
df2 = spark.read.parquet("s3://bucket/table2").repartition("key").sortWithinPartitions("key")

result = df1.join(df2, "key")

7. Оптимизатор Catalyst

Spark автоматически оптимизирует логические планы, но помогай ему подсказками:

# Используй SQL API вместо DataFrame API для лучшей оптимизации
spark.sql("""
    SELECT d.user_id, COUNT(*) as events
    FROM large_events d
    INNER JOIN broadcast(small_users u) ON d.user_id = u.id
    WHERE d.event_date >= '2024-01-01'
    GROUP BY d.user_id
""")

8. Настройка параллелизма

spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.default.parallelism", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")  # AQE

9. Используй Adaptive Query Execution (AQE)

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

AQE автоматически оптимизирует количество партиций и JOIN стратегии.

10. Избегай дорогих операций

# Плохо — collect() переносит все в driver
data = df.collect()

# Хорошо — обработай в workers
df.write.parquet("s3://bucket/output")

# Плохо — UDF (User Defined Functions) медленные
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def slow_func(x):
    return x.upper()

df = df.withColumn("upper_col", slow_func(df.col))

# Хорошо — встроенные функции
from pyspark.sql.functions import upper
df = df.withColumn("upper_col", upper(df.col))

Практический пример оптимизированной задачи

from pyspark.sql.functions import broadcast, col, count
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("OptimizedPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# 1. Читать только нужные колонки
events = spark.read.parquet("s3://bucket/events") \
    .select("user_id", "event_date", "event_type") \
    .filter(col("event_date") >= "2024-01-01")  # Predicate pushdown

# 2. Загрузить маленькую таблицу
users = spark.read.parquet("s3://bucket/users")

# 3. Кешировать если используется несколько раз
events.cache()

# 4. Broadcast JOIN
result = events.join(
    broadcast(users),
    "user_id"
)

# 5. Результат
result.groupBy("user_id").agg(count("*")).write.parquet("output")
events.unpersist()

Мониторинг и анализ

# Посмотреть план выполнения
df.explain(extended=True)

# Проверить число партиций
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Размер в памяти
print(df.cache().count())  # Загрузит в памяти

Чеклист оптимизации

  1. ✓ Используй только нужные колонки (Projection Pushdown)
  2. ✓ Фильтруй данные как можно раньше (Predicate Pushdown)
  3. ✓ Используй Broadcast для маленьких таблиц
  4. ✓ Правильное число партиций (200-400)
  5. ✓ Кешируй часто используемые DataFrame
  6. ✓ Включи Adaptive Query Execution (AQE)
  7. ✓ Избегай collect() на driver'е
  8. ✓ Предпочитай встроенные функции UDF
  9. ✓ Монитори Spark UI для bottleneck'ов
Расскажите о стратегиях оптимизации Spark-задач. | PrepBro