← Назад к вопросам

Подсчёт уникальных значений в Spark с объяснением выполнения

3.0 Senior🔥 121 комментариев
#Apache Spark#SQL и базы данных#Архитектура и проектирование

Условие

Дан большой датасет с полем user_id типа Long (8 байт). Необходимо:

Задание:

  1. Написать SQL-запрос для подсчёта уникальных значений user_id
  2. Детально объяснить, как Spark выполнит этот запрос:
    • Какие stage-ы будут созданы?
    • Где произойдёт shuffle?
    • Как будет происходить агрегация?
  3. Какой объём данных будет передан по сети при shuffle для 10 миллиардов записей?
  4. Предложите оптимизации для ускорения вычислений на основе практического опыта
  5. Когда имеет смысл использовать approx_count_distinct вместо точного подсчёта?

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение

SQL-запрос для подсчёта уникальных значений

SELECT COUNT(DISTINCT user_id) AS unique_users
FROM large_dataset;

Альтернативная форма:

SELECT COUNT(*)
FROM (
    SELECT DISTINCT user_id
    FROM large_dataset
) t;

Детальное объяснение выполнения в Spark

Этап 1: Logical Plan (что нужно сделать)

SparkSQL создаёт логический план:

Aggregate [COUNT(DISTINCT user_id)]
└── Scan parquet large_dataset

Этап 2: Physical Plan (как это делать)

HashAggregate (final)
├── Exchange hashpartitioning(user_id, 200)  ← Shuffle происходит здесь
└── HashAggregate (partial)
    └── Scan parquet large_dataset

Что происходит:

  1. Scan — каждый executor читает свою часть данных (например, один HDFS блок на executor'е)
  2. Partial Aggregation — на каждом executor'е локально считаются частичные distinct значения (HyperLogLog sketch или hash table)
  3. Shuffle — результаты частичной агрегации отправляются по сети, перемешиваются по user_id
  4. Final Aggregation — на stage'е 2 объединяются все partial результаты в финальный ответ

Детальное объяснение shuffle'а

Stage 1: Map phase (на каждом executor'е)

Executor 1 читает 1 млрд записей
├── user_id: 1, 5, 3, 1, 2, 5, ...
├── На лету считает partial distinct: {1, 5, 3, 2} (4 уникальных)
└── Выписывает в shuffle файл: (1, 1), (5, 1), (3, 1), (2, 1)

Executor 2 читает 1 млрд записей
├── user_id: 3, 7, 9, 3, ...
├── Partial distinct: {3, 7, 9}
└── Shuffle файл: (3, 1), (7, 1), (9, 1)

И так для всех 10 executor'ов

Stage 2: Reduce phase (переразделение данных)

Данные переразделяются по partition'ам based on hash(user_id):

Partition 0 (hash(user_id) % 200 == 0):
├── (1, 1), (1, 1), (1, 1), ...  ← всё воедино
├── (201, 1), (201, 1), ...
└── (1001, 1), ...

Partition 1 (hash(user_id) % 200 == 1):
├── (2, 1), (2, 1), ...
└── (202, 1), ...

Stage 3: Final Aggregation

Partition 0:
├── Вычитаем уникальные: 1, 201, 1001, ... → N значений

Partition 1:
├── Вычитаем уникальные: 2, 202, ... → M значений

Главный Driver собирает результаты: N + M + ... = Итого уникальных

Расчёт объёма данных при shuffle

Исходные данные

  • 10 миллиардов записей
  • Поле user_id = Long (8 байт)
  • Уникальных user_id ≈ 100 миллионов (0,01% от записей)

Объём shuffle'а

Вход (исходный датасет):

10,000,000,000 записей × (8 байт user_id + overhead)
≈ 80 ГБ + metadata

Shuffle (передача по сети в stage 1):

Лучший случай — Spark использует Partial Aggregation:

Каждый executor отправляет только distinct значения + счётчики
100 млн уникальных × (8 байт key + 8 байт value) = 1.6 ГБ

В сетевой передаче (если без оптимизации):

Без partial aggregation пришлось бы отправить все 80 ГБ
С partial aggregation: ~1.6 ГБ
Экономия: 50x

Правильный расчёт для Spark:

Shuffle Write: ~1.6 ГБ (из memory + disk spills)
Shuffle Read: ~1.6 ГБ (из сети)
Финальный merge: O(100 млн) хеш-таблица ≈ 800 МБ памяти

Оптимизации на основе практики

1. Использование более эффективного алгоритма COUNT(DISTINCT)

По умолчанию Spark использует: HashAggregate (хеш-таблица)

df.agg(
    F.countDistinct("user_id")
).show()

Это хорошо для распределённых данных, но требует shuffle.

2. Включи сжатие для shuffle

spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.io.compression.codec", "snappy")  # или lz4

Это снижает сетевой трафик на 30-50%.

3. Оптимальное число partition'ов

# Слишком мало partition'ов → данные не балансируются
# Слишком много → большой overhead

df_repartitioned = df.repartition(
    num_partitions=spark.sparkContext.defaultParallelism * 2  # Usually 200-400
)
df_repartitioned.agg(F.countDistinct("user_id")).show()

4. Используй broadcast для малых результатов

# Если нужно использовать distinct значения в join'е
distinct_users = df.select("user_id").distinct()
# Broadcast явно
df_other.join(broadcast(distinct_users), "user_id")

5. Фильтруй данные перед агрегацией

df.filter(F.col("status") == "active")  # Уменьши датасет перед COUNT(DISTINCT)
  .agg(F.countDistinct("user_id")).show()

Approx_count_distinct vs COUNT(DISTINCT)

COUNT(DISTINCT) — точный результат

SELECT COUNT(DISTINCT user_id) FROM dataset;
-- Результат: 99,999,999 (точно)

Плюсы:

  • Точный результат
  • Когда user_id < 1 млн, практически бесплатно

Минусы:

  • Требует shuffle'а
  • На 100+ млн уникальных значениях медленно (несколько минут)
  • Много памяти для хеш-таблицы

APPROX_COUNT_DISTINCT (HyperLogLog)

SELECT APPROX_COUNT_DISTINCT(user_id) FROM dataset;
-- Результат: 99,876,543 (примерно, ошибка ±2%)

Плюсы:

  • Очень быстро (100+ млн за 1-2 сек)
  • Малое использование памяти (константное O(log N))
  • Встроена в Spark и другие OLAP системы
  • Результаты часто достаточны для аналитики

Минусы:

  • Примерный результат (по умолчанию ошибка ±2-3%)
  • Для очень точных отчётов не подходит

Когда использовать approx_count_distinct

  1. Разведочный анализ данных (EDA)

    # "Сколько примерно юзеров в датасете?" → 5 сек
    df.agg(F.approx_count_distinct("user_id")).show()
    
  2. Мониторинг в дашбордах (real-time)

    # Dashboards обновляются каждую минуту, точность ±2% приемлема
    
  3. Сравнение объёмов

    # "Какой сегмент больше: A или B?"
    df.filter(col("segment") == "A").agg(approx_count_distinct("user_id"))
    df.filter(col("segment") == "B").agg(approx_count_distinct("user_id"))
    
  4. Когда результат используется в дальнейших вычислениях

    # Если потом будет умножение на среднее значение → небольшая ошибка приемлема
    unique_count = df.agg(approx_count_distinct("user_id"))
    avg_transactions = df.agg(avg("amount"))
    total_value = unique_count * avg_transactions  # ошибка ±2%
    

Когда использовать COUNT(DISTINCT)

  1. Финальные отчёты и метрики

    • SLA, KPI, биллинг
    • Акционерам нужны точные числа
  2. Юридические требования

    • Отчёты для регуляторов
    • Финансовая отчётность
  3. Когда уникальных значений мало (< 10 млн)

    • Выполнится быстро (< 1 сек)
    • Нет смысла ждать approx

Итоговый чеклист

  • Для аналитики → approx_count_distinct (быстро)
  • Для отчётов → COUNT(DISTINCT) (точно)
  • Включи shuffle compression (snappy/lz4)
  • Оптимальное число partition'ов = cores × 2-3
  • Фильтруй данные ПЕРЕД COUNT(DISTINCT)
  • На 100+ млн уникальных значений approx даёт выигрыш в 100x
Подсчёт уникальных значений в Spark с объяснением выполнения | PrepBro