← Назад к вопросам

Что такое перекос данных в Spark?

2.0 Middle🔥 161 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Перекос данных в Spark (Data Skew)

Перекос данных (skew) — это ситуация, когда данные распределяются по партициям неравномерно. Одна или несколько партиций получают значительно больше данных, чем остальные, что приводит к неэффективной параллельной обработке.

Типы перекоса в Spark

1. Перекос по ключам (Key Skew)

Один или несколько ключей группировки содержат аномально много записей.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.appName("SkewDemo").getOrCreate()

# Пример: у пользователя с id=1 миллион событий, остальные имеют по 100
df_events = spark.read.parquet("events.parquet")

# Анализ перекоса
skew_analysis = df_events.groupBy("user_id").count()
skew_analysis.filter(col("count") > 10000).show()

# Результат:
# user_id | count
# 1       | 1000000  <- ПЕРЕКОС!
# 2       | 100
# 3       | 150

2. Перекос по разделам (Partition Skew)

Данные неравномерно распределены по партициям из-за способа создания RDD/DataFrame.

# Плохо: partition_id содержит много нулей
df_raw = spark.read.text("large_file.txt")
df_partitioned = df_raw.repartition("partition_id")

# Проверка распределения
df_partitioned.rdd.getNumPartitions()  # 200 партиций
df_partitioned.rdd.glom().map(len).collect()  # [5000, 4900, 15000, 100, ...]

3. Перекос при JOIN операциях

Одна таблица имеет ключ, который присутствует массово в другой таблице.

# users таблица: user_id 999 (системный юзер) в миллионе строк
df_users = spark.read.parquet("users.parquet")
df_activity = spark.read.parquet("activity.parquet")

# JOIN приводит к перекосу
result = df_activity.join(df_users, "user_id")
# Партиция с user_id=999 обработает миллион строк, остальные по 1000

Симптомы перекоса

# Во время выполнения:
# - Одна task работает значительно дольше других
# - Прогресс зависает на последней задаче
# - Spark UI показывает огромную разницу в размере данных по партициям

# Программное обнаружение
df.rdd.mapPartitions(lambda part: [len(list(part))]).collect()
# [1000000, 50, 40, 30]  <- огромная разница! Перекос!

Стратегии борьбы с перекосом

Стратегия 1: Salting (Добавление соли)

Добавляем случайный суффикс к ключу перед группировкой, затем объединяем.

from pyspark.sql.functions import col, rand, concat, lit

# Проблемный код (без соли)
# df.groupBy("user_id").agg(sum("amount")).collect()  # Медленно!

# С солью
num_buckets = 10
df_salted = df.withColumn(
    "salted_key",
    concat(col("user_id"), lit("_"), (rand() * num_buckets).cast("int"))
)

# Группировка по соленому ключу
df_partial = df_salted.groupBy("salted_key").agg(
    sum("amount").alias("partial_sum")
)

# Финальная группировка (small aggregate)
df_final = df_partial.groupBy(
    col("salted_key").substr(1, col("salted_key").rindex("_") - 1).alias("user_id")
).agg(
    sum("partial_sum").alias("total_amount")
)

Стратегия 2: Repartition по другому ключу

Используем альтернативный ключ для партиционирования.

# Вместо группировки по user_id напрямую
# df.groupBy("user_id").count()  # Перекос

# Рepartitionируем по более равномерному ключу
df_repartitioned = df.repartition("region", "user_id")
result = df_repartitioned.groupBy("user_id").count()

Стратегия 3: Separate Hot Keys

Обрабатываем "горячие" ключи отдельно.

from pyspark.sql.functions import col, when

# Найти горячие ключи
hot_keys = df.groupBy("user_id").count() \
    .filter(col("count") > df.count() / 100) \
    .select("user_id") \
    .rdd.flatMap(lambda x: x).collect()

print(f"Hot keys: {hot_keys}")

# Разделить на два потока обработки
df_hot = df.filter(col("user_id").isin(hot_keys))
df_cold = df.filter(~col("user_id").isin(hot_keys))

# Обработать горячие с большей параллелизацией
result_hot = df_hot.repartition(50, "user_id").groupBy("user_id").count()
result_cold = df_cold.groupBy("user_id").count()

result = result_hot.union(result_cold)

Стратегия 4: Broadcast JOIN

Для JOIN операций используем broadcast вместо regular join.

from pyspark.sql.functions import broadcast

# Плохо: regular join
# result = df_big.join(df_small, "user_id")

# Хорошо: broadcast join (если df_small маленькая)
result = df_big.join(broadcast(df_small), "user_id")
# Избегаем shuffle на горячих ключах

Стратегия 5: Adaptive Query Execution (AQE)

Spark 3.0+ автоматически оптимизирует перекос.

# spark-submit --conf spark.sql.adaptive.enabled=true
# spark-submit --conf spark.sql.adaptive.skewJoin.enabled=true

df_result = df_activity.join(df_users, "user_id")
# AQE автоматически разбивает большие партиции

Пример комплексного решения

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .appName("SkewHandling").getOrCreate()

# 1. Загрузка и анализ
df = spark.read.parquet("data.parquet")

# 2. Обнаружение перекоса
skew_check = df.groupBy("user_id").count() \
    .agg(max("count").alias("max_count"), 
         avg("count").alias("avg_count"))
skew_ratio = skew_check.select(
    col("max_count") / col("avg_count")
).collect()[0][0]

if skew_ratio > 10:
    print(f"SKEW DETECTED: {skew_ratio}x")
    
    # 3. Применение соли
    df_processed = df.withColumn(
        "bucket",
        (rand() * 20).cast("int")
    ).repartition(200, "user_id", "bucket")
else:
    df_processed = df.repartition(100, "user_id")

# 4. Агрегация
result = df_processed.groupBy("user_id").agg(
    count("*").alias("total_events"),
    sum("amount").alias("total_amount")
)

Мониторинг в Spark UI

  • Stage Duration: обратить внимание на резкий скачок в одной task
  • Shuffle Read/Write: неравномерное распределение данных
  • Task Duration: последняя задача работает в 100+ раз дольше

Перекос vs Production Impact

В большинстве production pipeline перекос — это узкое место производительности. Опытный Data Engineer всегда анализирует распределение данных перед запуском массивных агрегаций.

Что такое перекос данных в Spark? | PrepBro