Подсчёт уникальных значений в Spark с объяснением выполнения
Условие
Дан большой датасет с полем user_id типа Long (8 байт). Необходимо:
Задание:
- Написать SQL-запрос для подсчёта уникальных значений user_id
- Детально объяснить, как Spark выполнит этот запрос:
- Какие stage-ы будут созданы?
- Где произойдёт shuffle?
- Как будет происходить агрегация?
- Какой объём данных будет передан по сети при shuffle для 10 миллиардов записей?
- Предложите оптимизации для ускорения вычислений на основе практического опыта
- Когда имеет смысл использовать approx_count_distinct вместо точного подсчёта?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение
SQL-запрос для подсчёта уникальных значений
SELECT COUNT(DISTINCT user_id) AS unique_users
FROM large_dataset;
Альтернативная форма:
SELECT COUNT(*)
FROM (
SELECT DISTINCT user_id
FROM large_dataset
) t;
Детальное объяснение выполнения в Spark
Этап 1: Logical Plan (что нужно сделать)
SparkSQL создаёт логический план:
Aggregate [COUNT(DISTINCT user_id)]
└── Scan parquet large_dataset
Этап 2: Physical Plan (как это делать)
HashAggregate (final)
├── Exchange hashpartitioning(user_id, 200) ← Shuffle происходит здесь
└── HashAggregate (partial)
└── Scan parquet large_dataset
Что происходит:
- Scan — каждый executor читает свою часть данных (например, один HDFS блок на executor'е)
- Partial Aggregation — на каждом executor'е локально считаются частичные distinct значения (HyperLogLog sketch или hash table)
- Shuffle — результаты частичной агрегации отправляются по сети, перемешиваются по user_id
- Final Aggregation — на stage'е 2 объединяются все partial результаты в финальный ответ
Детальное объяснение shuffle'а
Stage 1: Map phase (на каждом executor'е)
Executor 1 читает 1 млрд записей
├── user_id: 1, 5, 3, 1, 2, 5, ...
├── На лету считает partial distinct: {1, 5, 3, 2} (4 уникальных)
└── Выписывает в shuffle файл: (1, 1), (5, 1), (3, 1), (2, 1)
Executor 2 читает 1 млрд записей
├── user_id: 3, 7, 9, 3, ...
├── Partial distinct: {3, 7, 9}
└── Shuffle файл: (3, 1), (7, 1), (9, 1)
И так для всех 10 executor'ов
Stage 2: Reduce phase (переразделение данных)
Данные переразделяются по partition'ам based on hash(user_id):
Partition 0 (hash(user_id) % 200 == 0):
├── (1, 1), (1, 1), (1, 1), ... ← всё воедино
├── (201, 1), (201, 1), ...
└── (1001, 1), ...
Partition 1 (hash(user_id) % 200 == 1):
├── (2, 1), (2, 1), ...
└── (202, 1), ...
Stage 3: Final Aggregation
Partition 0:
├── Вычитаем уникальные: 1, 201, 1001, ... → N значений
Partition 1:
├── Вычитаем уникальные: 2, 202, ... → M значений
Главный Driver собирает результаты: N + M + ... = Итого уникальных
Расчёт объёма данных при shuffle
Исходные данные
- 10 миллиардов записей
- Поле user_id = Long (8 байт)
- Уникальных user_id ≈ 100 миллионов (0,01% от записей)
Объём shuffle'а
Вход (исходный датасет):
10,000,000,000 записей × (8 байт user_id + overhead)
≈ 80 ГБ + metadata
Shuffle (передача по сети в stage 1):
Лучший случай — Spark использует Partial Aggregation:
Каждый executor отправляет только distinct значения + счётчики
100 млн уникальных × (8 байт key + 8 байт value) = 1.6 ГБ
В сетевой передаче (если без оптимизации):
Без partial aggregation пришлось бы отправить все 80 ГБ
С partial aggregation: ~1.6 ГБ
Экономия: 50x
Правильный расчёт для Spark:
Shuffle Write: ~1.6 ГБ (из memory + disk spills)
Shuffle Read: ~1.6 ГБ (из сети)
Финальный merge: O(100 млн) хеш-таблица ≈ 800 МБ памяти
Оптимизации на основе практики
1. Использование более эффективного алгоритма COUNT(DISTINCT)
По умолчанию Spark использует: HashAggregate (хеш-таблица)
df.agg(
F.countDistinct("user_id")
).show()
Это хорошо для распределённых данных, но требует shuffle.
2. Включи сжатие для shuffle
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.io.compression.codec", "snappy") # или lz4
Это снижает сетевой трафик на 30-50%.
3. Оптимальное число partition'ов
# Слишком мало partition'ов → данные не балансируются
# Слишком много → большой overhead
df_repartitioned = df.repartition(
num_partitions=spark.sparkContext.defaultParallelism * 2 # Usually 200-400
)
df_repartitioned.agg(F.countDistinct("user_id")).show()
4. Используй broadcast для малых результатов
# Если нужно использовать distinct значения в join'е
distinct_users = df.select("user_id").distinct()
# Broadcast явно
df_other.join(broadcast(distinct_users), "user_id")
5. Фильтруй данные перед агрегацией
df.filter(F.col("status") == "active") # Уменьши датасет перед COUNT(DISTINCT)
.agg(F.countDistinct("user_id")).show()
Approx_count_distinct vs COUNT(DISTINCT)
COUNT(DISTINCT) — точный результат
SELECT COUNT(DISTINCT user_id) FROM dataset;
-- Результат: 99,999,999 (точно)
Плюсы:
- Точный результат
- Когда user_id < 1 млн, практически бесплатно
Минусы:
- Требует shuffle'а
- На 100+ млн уникальных значениях медленно (несколько минут)
- Много памяти для хеш-таблицы
APPROX_COUNT_DISTINCT (HyperLogLog)
SELECT APPROX_COUNT_DISTINCT(user_id) FROM dataset;
-- Результат: 99,876,543 (примерно, ошибка ±2%)
Плюсы:
- Очень быстро (100+ млн за 1-2 сек)
- Малое использование памяти (константное O(log N))
- Встроена в Spark и другие OLAP системы
- Результаты часто достаточны для аналитики
Минусы:
- Примерный результат (по умолчанию ошибка ±2-3%)
- Для очень точных отчётов не подходит
Когда использовать approx_count_distinct
-
Разведочный анализ данных (EDA)
# "Сколько примерно юзеров в датасете?" → 5 сек df.agg(F.approx_count_distinct("user_id")).show() -
Мониторинг в дашбордах (real-time)
# Dashboards обновляются каждую минуту, точность ±2% приемлема -
Сравнение объёмов
# "Какой сегмент больше: A или B?" df.filter(col("segment") == "A").agg(approx_count_distinct("user_id")) df.filter(col("segment") == "B").agg(approx_count_distinct("user_id")) -
Когда результат используется в дальнейших вычислениях
# Если потом будет умножение на среднее значение → небольшая ошибка приемлема unique_count = df.agg(approx_count_distinct("user_id")) avg_transactions = df.agg(avg("amount")) total_value = unique_count * avg_transactions # ошибка ±2%
Когда использовать COUNT(DISTINCT)
-
Финальные отчёты и метрики
- SLA, KPI, биллинг
- Акционерам нужны точные числа
-
Юридические требования
- Отчёты для регуляторов
- Финансовая отчётность
-
Когда уникальных значений мало (< 10 млн)
- Выполнится быстро (< 1 сек)
- Нет смысла ждать approx
Итоговый чеклист
- Для аналитики → approx_count_distinct (быстро)
- Для отчётов → COUNT(DISTINCT) (точно)
- Включи shuffle compression (snappy/lz4)
- Оптимальное число partition'ов = cores × 2-3
- Фильтруй данные ПЕРЕД COUNT(DISTINCT)
- На 100+ млн уникальных значений approx даёт выигрыш в 100x