← Назад к вопросам

Как бороться с data skew в распределенных системах?

2.0 Middle🔥 171 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Data Skew в распределённых системах

Data Skew (перекос данных) — это ситуация, когда данные неравномерно распределены между узлами кластера, что приводит к дисбалансу нагрузки. Одни узлы работают перегруженными, другие недоиспользуются, что снижает производительность всей системы.

Типы Data Skew

1. Skew по KEY (ключевой перекос)

При группировке или JOIN операции некоторые значения ключей встречаются намного чаще, чем другие.

-- Пример перекоса: популярный продукт
SELECT product_id, COUNT(*) as cnt
FROM sales
GROUP BY product_id
ORDER BY cnt DESC;

-- Результат: product_id=123 имеет 10M записей, остальные по 1K
-- Все записи с product_id=123 обрабатываются одним executor'ом

2. Skew по размеру файла

Некоторые партиции данных намного больше других.

# HDFS файлы неравного размера
# File 1: 10GB
# File 2: 100MB
# File 3: 200MB
# Обработчик File 1 работает дольше

3. Skew по времени обработки

Некоторые записи требуют больше времени на обработку.

# Процедура обработки строк
for row in data:
    if row['type'] == 'complex':
        process_complex(row)  # 5 секунд
    else:
        process_simple(row)   # 0.1 секунды
# Если большинство complex, один executor работает долго

Проблемы, вызванные Data Skew

  • Медленные запросы — самый медленный узел определяет скорость
  • Out of Memory — перегруженный узел может нехватить памяти
  • Неэффективность ресурсов — некоторые узлы простаивают
  • Таймауты — долгие операции могут превысить timeout

Решения для борьбы с Data Skew

1. Салтинг (Salting) — добавление случайного префикса

# Без салтинга: все user_id=999 → один partition
# С салтингом: user_id=999 → 999_0, 999_1, 999_2, ...

import random

def salted_key(user_id, num_salt_buckets=10):
    salt = random.randint(0, num_salt_buckets - 1)
    return f"{user_id}_{salt}"

# В PySpark
from pyspark.sql.functions import rand, concat, lit

df = df.withColumn("salted_key", concat(col("user_id"), lit("_"), 
                                        (rand() * 10).cast("int")))

SQL пример с салтингом:

-- Салтинг при GROUP BY
SELECT 
    CONCAT(product_id, '_', FLOOR(RAND() * 10)) as salted_key,
    product_id,
    COUNT(*) as cnt
FROM sales
GROUP BY CONCAT(product_id, '_', FLOOR(RAND() * 10)), product_id;

2. Repartitioning (переразбиение)

Явно контролировать количество и распределение партиций:

# PySpark
from pyspark.sql import functions as F

# Переразбиение по нескольким колонкам
df = df.repartition(200, "user_id", "date")

# Или использовать partitionBy для лучшего контроля
df = df.repartition(F.spark_partition_id())

3. Избавление от NULL значений

-- Проблема: NULL значения часто попадают в один partition
SELECT category, COUNT(*)
FROM products
GROUP BY category;

-- Solution: обработать NULL отдельно
SELECT COALESCE(category, 'UNKNOWN') as category, COUNT(*)
FROM products
GROUP BY COALESCE(category, 'UNKNOWN');

В PySpark:

df = df.fillna({'category': 'UNKNOWN'})
# Теперь NULL не скопится в одном partition

4. Использование Adaptive Query Execution (AQE)

Spark автоматически адаптирует распределение на основе статистики:

# Включить AQE в Spark 3.0+
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewFactor", 5.0)

# AQE автоматически разделит перекошенные partition'ы
df_result = df1.join(df2, df1.id == df2.id)

5. Двух-фазный GROUP BY (Two-phase aggregation)

При сильном перекосе использовать промежуточное салтирование:

# Фаза 1: Частичная агрегация с салтингом
phase1 = df.groupBy(F.concat(col("key"), lit("_"), 
                             (F.rand() * 10).cast("int")))
              .agg(F.sum("value").alias("partial_sum"))

# Фаза 2: Финальная агрегация
phase2 = phase1.groupBy(F.regexp_replace("salted_key", "_\\d+$", ""))
               .agg(F.sum("partial_sum").alias("total_sum"))

6. Предварительное фильтрование

# Избежать обработки ненужных данных
df = df.filter((df.date >= '2024-01-01') & (df.date <= '2024-01-31'))
df = df.filter(df.status != 'deleted')

# Меньше данных = меньше перекос

7. Использование broadcast для малых таблиц

# Если таблица small достаточно мала для памяти
from pyspark.sql.functions import broadcast

result = large_df.join(broadcast(small_df), "id")
# Этот JOIN не страдает от skew

Мониторинг и диагностика Data Skew

# Проверить распределение данных
df.groupBy(col("key")).count().show(20)

# Рассчитать коэффициент skew
from pyspark.sql.functions import stddev, avg

stats = df.groupBy(col("key")).count()\
    .agg(avg("count").alias("avg_count"),
         stddev("count").alias("std_count"))

# Skew Factor = max_count / avg_count
# Если > 5, есть проблема

SQL для диагностики:

-- Найти самые перекошенные ключи
SELECT key, COUNT(*) as cnt,
       COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () as percentage
FROM table_name
GROUP BY key
ORDER BY cnt DESC
LIMIT 20;

Пример комплексного решения

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("SkewFix").getOrCreate()
spark.conf.set("spark.sql.adaptive.enabled", "true")

# 1. Загрузить данные
df = spark.read.csv("large_dataset.csv", header=True)

# 2. Удалить NULL в ключе
df = df.filter(col("product_id").isNotNull())

# 3. Добавить salt для горячих ключей
hot_products = ["123", "456"]  # Known skew
df = df.withColumn(
    "salted_key",
    when(col("product_id").isin(hot_products),
         concat(col("product_id"), lit("_"), (rand() * 100).cast("int")))
    .otherwise(concat(col("product_id"), lit("_0")))
)

# 4. Выполнить операцию
result = df.groupBy("salted_key", "product_id")\
    .agg(sum("amount").alias("total"))\
    .groupBy("product_id")\
    .agg(sum("total").alias("grand_total"))

result.show()

Лучшие практики

  • Анализировать распределение перед операциями (GROUP BY, JOIN)
  • Использовать несколько ключей для разбиения при возможности
  • Включить AQE в современных версиях Spark
  • Мониторить задачи в Spark UI для выявления перекоса
  • Предварительная фильтрация данных для уменьшения объёма
  • Тестировать решения на репрезентативных наборах данных

Data Skew — это частая проблема в больших данных, требующая постоянного внимания и оптимизации.