Проектирование схемы данных для дашборда РТО
Условие
Необходимо спроектировать схему данных и написать запросы для дашборда по розничному товарообороту (РТО) и маржинальности магазинов по месяцам в разрезе городов и категорий товаров.
Исходные данные:
- 100K товаров (SKU)
- 20K магазинов
- 300 категорий товаров
- 2+ миллиарда продаж в месяц
- Данные хранятся несколько лет
- Обновление раз в месяц
Задание:
-
Спроектируйте двухуровневую архитектуру:
- Слой 1 (Hive): хранение сырой истории продаж
- Слой 2 (ClickHouse): предаггрегированные данные для быстрой аналитики
-
Опишите схемы таблиц для обоих слоёв
-
Напишите SQL-запросы:
- Первичная загрузка в ClickHouse
- Инкрементальная загрузка новых данных
- Запрос для дашборда: сумма РТО и маржинальности по городам за текущий месяц
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение
Архитектурный подход
Двухслойная архитектура:
- Слой 1 (Hive) — холодное хранилище для истории, источник истины
- Слой 2 (ClickHouse) — горячий слой, предаггрегированные данные для мгновенных запросов
Этот подход позволяет:
- Хранить полную историю в Hive (долгосрочное хранение)
- Быстро отвечать на аналитические запросы в ClickHouse
- Переиспользовать данные для разных дашбордов и отчётов
Слой 1: Hive (Raw Data)
Таблица sales_raw — логи всех продаж
CREATE TABLE IF NOT EXISTS sales_raw (
sale_id BIGINT,
sale_date DATE,
sale_timestamp TIMESTAMP,
store_id INT,
sku_id INT,
category_id INT,
city_name STRING,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(15,2),
cost_amount DECIMAL(15,2),
margin_amount DECIMAL(15,2),
load_date DATE
)
PARTITIONED BY (year INT, month INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
Ключи:
sale_id— уникальный идентификатор каждой продажиsale_date— дата продажи (для фильтрации)store_id, sku_id, category_id— связи для join'а с справочникамиcity_name— денормализация для ускорения (можно join'ить со справочником store, но здесь проще)quantity, unit_price— атомарные данныеmargin_amount = total_amount - cost_amountload_date— дата загрузки в HDFS (для отслеживания свежести)
Партиционирование: YEAR, MONTH для эффективного удаления старых данных и быстрых query'ев за конкретный период.
Справочная таблица stores_dim
CREATE TABLE IF NOT EXISTS stores_dim (
store_id INT,
store_name STRING,
city_id INT,
city_name STRING,
region_name STRING,
store_type STRING,
opening_date DATE
)
STORED AS PARQUET;
Слой 2: ClickHouse (Aggregated Data)
ClickHouse оптимален для аналитики благодаря:
- Column-oriented хранению (сжатие 10-100x)
- MergeTree движку (партиционирование, индексы)
- Мгновенным агрегациям по любым измерениям
Таблица sales_by_city_category_month
CREATE TABLE sales_by_city_category_month (
report_month Date,
city_name String,
category_id UInt32,
category_name String,
total_quantity UInt64,
total_rto Decimal128(2), -- Розничный товарооборот (выручка)
total_cost Decimal128(2), -- Себестоимость
total_margin Decimal128(2), -- Маржинальность (выручка - себестоимость)
margin_percent Float32, -- Процент маржинальности
unique_stores UInt32, -- Количество магазинов в городе, которые продавали эту категорию
load_timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (report_month, city_name, category_id)
PARTITION BY toYYYYMM(report_month);
Ключи проектирования:
ORDER BY (report_month, city_name, category_id)— главные фильтры дашборда идут в ORDER BYPARTITION BY month— позволяет удалять или обновлять данные за целый месяцtotal_rto— сумма всехtotal_amount(выручка)margin_percent— процент для быстрых сравненийunique_stores— для контекста (сколько магазинов участвовало)
Таблица sales_by_city_month (более обобщённая)
CREATE TABLE sales_by_city_month (
report_month Date,
city_name String,
total_rto Decimal128(2),
total_cost Decimal128(2),
total_margin Decimal128(2),
margin_percent Float32,
unique_stores UInt32,
unique_skus UInt32,
transaction_count UInt64,
load_timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (report_month, city_name)
PARTITION BY toYYYYMM(report_month);
SQL-запросы
1. Первичная загрузка в ClickHouse (Spark → ClickHouse)
-- Вычисляем аггрегацию в Spark, потом INSERT в ClickHouse
INSERT INTO sales_by_city_category_month
SELECT
toDate(concat(toString(year), '-', toString(month), '-01')) AS report_month,
city_name,
category_id,
category_name,
SUM(quantity) AS total_quantity,
SUM(total_amount) AS total_rto,
SUM(cost_amount) AS total_cost,
SUM(margin_amount) AS total_margin,
round(100.0 * SUM(margin_amount) / SUM(total_amount), 2) AS margin_percent,
uniq(store_id) AS unique_stores,
now() AS load_timestamp
FROM sales_raw
WHERE year = 2024 AND month = 3 -- Конкретный месяц
GROUP BY
report_month,
city_name,
category_id,
category_name;
Примечание: Эта загрузка выполняется как Spark job:
# PySpark код для загрузки
df_sales = spark.sql("""
SELECT
year, month,
city_name, category_id, category_name,
SUM(quantity) as total_quantity,
SUM(total_amount) as total_rto,
SUM(cost_amount) as total_cost,
SUM(margin_amount) as total_margin
FROM sales_raw
WHERE year = 2024 AND month = 3
GROUP BY year, month, city_name, category_id, category_name
""")
df_sales.write \
.format("jdbc") \
.option("url", "jdbc:clickhouse://clickhouse-host:8123/default") \
.option("dbtable", "sales_by_city_category_month") \
.option("user", "default") \
.option("password", "") \
.mode("append") \
.save()
2. Инкрементальная загрузка новых данных
Стратегия: Каждый месяц загружаем только новые данные (за прошлый месяц).
-- Удаляем старые данные за этот месяц (на случай переобработки)
ALTER TABLE sales_by_city_category_month
DELETE WHERE report_month = toDate('2024-04-01');
-- Вставляем новые данные
INSERT INTO sales_by_city_category_month
SELECT
toDate('2024-04-01') AS report_month,
city_name,
category_id,
category_name,
SUM(quantity) AS total_quantity,
SUM(total_amount) AS total_rto,
SUM(cost_amount) AS total_cost,
SUM(margin_amount) AS total_margin,
round(100.0 * SUM(margin_amount) / SUM(total_amount), 2) AS margin_percent,
uniq(store_id) AS unique_stores,
now() AS load_timestamp
FROM sales_raw
WHERE year = 2024 AND month = 4
GROUP BY
city_name,
category_id,
category_name;
3. Запрос для дашборда: РТО и маржинальность по городам за текущий месяц
SELECT
city_name,
SUM(total_rto) AS total_rto,
SUM(total_margin) AS total_margin,
ROUND(
100.0 * SUM(total_margin) / SUM(total_rto),
2
) AS margin_percent,
COUNT(DISTINCT category_id) AS categories_count,
SUM(unique_stores) / COUNT(DISTINCT category_id) AS avg_stores_per_category
FROM sales_by_city_category_month
WHERE report_month >= toDate('2024-03-01')
GROUP BY city_name
ORDER BY total_rto DESC;
Что происходит:
- ClickHouse считывает за доли секунды благодаря column-oriented хранению
- Фильтр
WHERE report_month >= '2024-03-01'использует индекс (partition pruning) - Группировка по городу работает быстро (несколько млн строк, а не миллиарды)
4. Альтернативный запрос: сравнение месяцев
SELECT
city_name,
report_month,
SUM(total_rto) AS monthly_rto,
SUM(total_margin) AS monthly_margin,
ROUND(100.0 * SUM(total_margin) / SUM(total_rto), 2) AS margin_percent
FROM sales_by_city_category_month
WHERE report_month >= toDate('2024-01-01')
GROUP BY city_name, report_month
ORDER BY city_name, report_month DESC;
5. Top категории в городе
SELECT
city_name,
category_name,
total_rto,
total_margin,
margin_percent,
ROW_NUMBER() OVER (PARTITION BY city_name ORDER BY total_rto DESC) AS rank
FROM sales_by_city_category_month
WHERE report_month = toDate('2024-03-01')
QUALIFY rank <= 10;
Обоснование архитектуры
Почему Hive (для сырых данных)?
-
Дешевое хранилище — 2+ млрд записей в месяц × 5 лет = 120 млрд записей
- На HDFS: ~2 ТБ/год (с Snappy compression)
- Стоимость: дешево
-
Простой для обновления — если нужно переобработать месяц, просто переписываем partition
-
Интеграция с Spark — обрабатываем Spark job'ами
Почему ClickHouse (для аналитики)?
- Скорость — 120 млн строк запрос выполняется за 100-500 мс (вместо 10+ сек в Hive)
- Сжатие — Column-oriented: 1 ТБ сырых данных → 50-100 ГБ в ClickHouse
- Встроенные функции — percentile, groupArray, и другие для аналитики
- Масштабирование — встроенная репликация и шардирование
Параметры для production
ClickHouse конфиг
<clickhouse>
<!-- Сжатие для экономии места -->
<compression>
<case>
<min_part_size>10000000</min_part_size>
<min_part_size_ratio>0.01</min_part_size_ratio>
<method>lz4</method>
</case>
</compression>
<!-- Merge параметры -->
<merge_tree>
<max_parts_to_merge_at_once>100</max_parts_to_merge_at_once>
<max_bytes_to_merge_at_once>5368709120</max_bytes_to_merge_at_once>
</merge_tree>
</clickhouse>
Мониторинг
-- Размер таблицы
SELECT
table,
formatReadableSize(sum(data_compressed_bytes)) as compressed_size,
formatReadableSize(sum(data_uncompressed_bytes)) as uncompressed_size,
sum(rows) as rows_count
FROM system.parts
WHERE database = 'default' AND table LIKE 'sales_by_%'
GROUP BY table;
-- Скорость запросов
SELECT
query_duration_ms,
read_rows,
read_bytes
FROM system.query_log
WHERE database = 'default'
AND query LIKE 'SELECT%'
LIMIT 10;
Итоговый чеклист
- Hive для полной истории (cold storage)
- ClickHouse для аналитики (hot layer)
- Партиционирование по месяцам в обеих системах
- ORDER BY в ClickHouse = главные dimension'ы дашборда
- Инкрементальная загрузка (DELETE + INSERT за месяц)
- Мониторь размер и скорость ClickHouse
- Backup Hive на S3/HDFS
- Репликация ClickHouse для HA