Какие слои данных были в хранилище?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Слои данных в хранилище (Data Warehouse Architecture)
Обзор многослойной архитектуры
Data Warehouse строится на основе многослойной архитектуры, где каждый слой имеет конкретное назначение. Это обеспечивает организацию данных, управляемость, качество и производительность системы.
Классическая архитектура: 3 слоя
┌──────────────────────────────────────────────────────────┐
│ External Data Sources (API, ERP, CRM) │
└───────────────────────┬──────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 1. Raw / Staging Layer (Неочищенные данные) │
│ └── Копия источников данных как есть │
└───────────────────────┬──────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 2. Processing / Integration Layer (Трансформированные) │
│ └── Чистые, денормализованные данные │
└───────────────────────┬──────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3. Mart / Presentation Layer (Витрины для аналитики) │
│ └── Агрегированные данные для конечных пользователей │
└───────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
▼ ▼
BI Tools Python Analytics
Tableau Data Science
Power BI ML Models
Слой 1: Raw / Staging Layer
Назначение: Копировать данные из источников без изменений
Характеристики:
- Структура 1:1 с исходной БД
- Нет трансформаций
- Нет очистки
- Часто неполные или дублирующиеся данные
- Быстрая загрузка (bulk insert)
Структура в HDFS/S3:
/warehouse/staging/
├── customers/
│ ├── year=2023/month=01/
│ │ └── part-*.parquet
│ └── year=2024/month=01/
│ └── part-*.parquet
├── orders/
├── products/
└── transactions/
Пример SQL схемы:
CREATE SCHEMA staging;
-- Копия таблицы customers из источника
CREATE TABLE staging.customers (
customer_id INT,
customer_name VARCHAR(255),
email VARCHAR(255),
phone VARCHAR(20),
address VARCHAR(500),
country VARCHAR(50),
load_timestamp TIMESTAMP,
source_system VARCHAR(50)
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;
-- Копия таблицы orders
CREATE TABLE staging.orders (
order_id INT,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2),
status VARCHAR(50),
load_timestamp TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;
ETL для staging:
from pyspark.sql import SparkSession
from datetime import datetime
spark = SparkSession.builder.appName('staging_load').getOrCreate()
# Чтение из источника (напрямую из БД)
df = spark.read \
.format('jdbc') \
.option('url', 'jdbc:postgresql://source_db:5432/prod') \
.option('dbtable', 'public.customers') \
.option('user', 'user') \
.option('password', 'pass') \
.load()
# Добавить load timestamp
from pyspark.sql.functions import current_timestamp, year, month
df_with_metadata = df.withColumn(
'load_timestamp', current_timestamp()
).withColumn(
'year', year(current_timestamp())
).withColumn(
'month', month(current_timestamp())
)
# Написать в staging (append mode)
df_with_metadata.write \
.mode('append') \
.partitionBy('year', 'month') \
.parquet('s3://data-lake/staging/customers')
Слой 2: Processing / Integration Layer
Назначение: Очистить, денормализовать и подготовить данные к аналитике
Что происходит:
- Удаление дубликатов
- Очистка (trim, null handling, type casting)
- Соединение таблиц (join)
- Расчёт производных полей
- Установка бизнес-правил
- Медленное измерение (slowly changing dimensions)
Структура в HDFS/S3:
/warehouse/processing/
├── dim_customers/ # Dimension таблица
│ └── part-*.parquet
├── dim_products/ # Dimension таблица
│ └── part-*.parquet
├── dim_dates/ # Date dimension
│ └── part-*.parquet
├── fct_orders/ # Fact таблица
│ └── part-*.parquet
└── fct_transactions/
└── part-*.parquet
Пример: Создание Dimension таблицы (SCD Type 2)
-- Processing слой: Dimension таблица customers (SCD Type 2)
CREATE TABLE processing.dim_customers (
customer_sk INT,
customer_id INT,
customer_name VARCHAR(255),
email VARCHAR(255),
phone VARCHAR(20),
address VARCHAR(500),
country VARCHAR(50),
segment VARCHAR(50), -- Вычисленное поле
effective_date DATE,
end_date DATE,
is_current BOOLEAN,
created_timestamp TIMESTAMP,
updated_timestamp TIMESTAMP
)
STORED AS PARQUET;
-- Пример заполнения (медленное измерение)
WITH new_data AS (
SELECT
ROW_NUMBER() OVER (ORDER BY customer_id) as customer_sk,
s.customer_id,
s.customer_name,
s.email,
s.phone,
s.address,
s.country,
CASE
WHEN amount_total > 100000 THEN 'VIP'
WHEN amount_total > 10000 THEN 'Premium'
ELSE 'Standard'
END as segment,
CURRENT_DATE as effective_date,
NULL::DATE as end_date,
TRUE as is_current
FROM staging.customers s
LEFT JOIN fct_orders o ON s.customer_id = o.customer_id
GROUP BY s.customer_id, s.customer_name
)
INSERT INTO processing.dim_customers
SELECT * FROM new_data;
Пример: Создание Fact таблицы
-- Fact таблица orders (детальные транзакции)
CREATE TABLE processing.fct_orders (
order_sk INT,
order_id INT,
customer_sk INT,
product_sk INT,
order_date DATE,
amount DECIMAL(10,2),
quantity INT,
discount DECIMAL(10,2),
net_amount DECIMAL(10,2), -- amount - discount
status VARCHAR(50),
created_timestamp TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;
ETL для processing (dbt пример):
-- models/processing/dim_customers.sql
{{ config(
materialized='table',
schema='processing',
tags=['critical']
) }}
WITH source_data AS (
SELECT
customer_id,
customer_name,
email,
phone,
address,
country
FROM {{ source('staging', 'customers') }}
),
agg_orders AS (
SELECT
customer_id,
COUNT(DISTINCT order_id) as total_orders,
SUM(amount) as total_spent
FROM {{ source('staging', 'orders') }}
GROUP BY customer_id
),
final AS (
SELECT
{{ dbt_utils.surrogate_key(['s.customer_id']) }} as customer_sk,
s.customer_id,
s.customer_name,
s.email,
s.phone,
s.address,
s.country,
CASE
WHEN COALESCE(o.total_spent, 0) > 100000 THEN 'VIP'
WHEN COALESCE(o.total_spent, 0) > 10000 THEN 'Premium'
ELSE 'Standard'
END as segment,
CURRENT_DATE as effective_date,
NULL as end_date,
TRUE as is_current,
CURRENT_TIMESTAMP as created_timestamp
FROM source_data s
LEFT JOIN agg_orders o USING (customer_id)
)
SELECT * FROM final
Слой 3: Mart / Presentation Layer
Назначение: Предоставить готовые данные для конечных пользователей и BI инструментов
Характеристики:
- Агрегированные данные
- Полностью денормализованные (широкие таблицы)
- Оптимизированы для específicных use cases
- Быстрые запросы (< 5 секунд)
- Простая навигация для бизнес-пользователей
Структура витрин:
/warehouse/marts/
├── sales/
│ ├── sales_daily_summary/ # Агрегированные продажи
│ ├── sales_by_region/ # По регионам
│ └── sales_by_product/ # По продуктам
├── customer/
│ ├── customer_lifetime_value/ # CLV для каждого клиента
│ ├── customer_segments/
│ └── customer_churn_risk/
├── financial/
│ ├── revenue_forecast/
│ └── margin_analysis/
└── operations/
├── inventory_levels/
└── shipment_tracking/
Пример: Витрина продаж по дням
-- Витрина: Ежедневные продажи по регионам и продукты
CREATE TABLE marts.sales_daily_summary (
sale_date DATE,
region VARCHAR(100),
product_category VARCHAR(100),
total_orders INT,
total_quantity INT,
total_revenue DECIMAL(15,2),
total_discount DECIMAL(15,2),
avg_order_value DECIMAL(10,2),
customer_count INT,
repeat_customer_count INT,
new_customer_count INT
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;
-- Заполнение витрины
INSERT INTO marts.sales_daily_summary
SELECT
o.order_date as sale_date,
c.country as region,
p.category as product_category,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(o.quantity) as total_quantity,
SUM(o.net_amount) as total_revenue,
SUM(o.discount) as total_discount,
AVG(o.net_amount) as avg_order_value,
COUNT(DISTINCT c.customer_sk) as customer_count,
COUNT(DISTINCT CASE WHEN o.is_repeat=TRUE THEN c.customer_sk END) as repeat_customer_count,
COUNT(DISTINCT CASE WHEN o.is_new=TRUE THEN c.customer_sk END) as new_customer_count
FROM processing.fct_orders o
JOIN processing.dim_customers c USING (customer_sk)
JOIN processing.dim_products p USING (product_sk)
GROUP BY o.order_date, c.country, p.category;
Пример витрины для конечного пользователя (широкая таблица):
-- Витрина: Клиент + его статистика (для BI инструмента)
CREATE TABLE marts.customer_360 (
customer_id INT,
customer_name VARCHAR(255),
email VARCHAR(255),
country VARCHAR(100),
segment VARCHAR(50),
lifecycle_stage VARCHAR(50),
total_orders INT,
total_spent DECIMAL(15,2),
avg_order_value DECIMAL(10,2),
first_order_date DATE,
last_order_date DATE,
days_since_last_order INT,
churn_risk FLOAT,
ltv_predicted DECIMAL(15,2)
)
STORED AS PARQUET;
Современная архитектура: Lakehouse (Data Lake + Warehouse)
┌─────────────────────────────────────────────────────────┐
│ External Data Sources │
└───────────────────────┬─────────────────────────────────┘
│
┌───────────┴───────────┐
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Raw Data Lake │ │ Real-time │
│ (HDFS/S3) │ │ Streaming │
│ (Format: Parquet)│ │ (Kafka/Pulsar) │
└───────┬──────────┘ └────────┬────────┘
│ │
└───────────┬───────────┘
│
┌───────────▼────────────┐
│ Bronze Layer │
│ (Raw, unprocessed) │
│ (Delta Lake / Iceberg) │
└───────────┬────────────┘
│
┌───────────▼────────────┐
│ Silver Layer │
│ (Cleaned, integrated) │
│ (Delta Lake / Iceberg) │
└───────────┬────────────┘
│
┌───────────▼────────────┐
│ Gold Layer │
│ (Aggregated, business) │
│ (Delta Lake / Iceberg) │
└───────────┬────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
SQL Query BI Tools Python ML
Analytics Tableau Models
Современный подход: Bronze-Silver-Gold
# Bronze: Raw данные
df_bronze = spark.read.parquet('s3://datalake/bronze/customers')
# Silver: Очищенные данные
df_silver = df_bronze \
.filter(col('customer_id').isNotNull()) \
.withColumn('email', lower(col('email'))) \
.dropDuplicates(['customer_id'])
df_silver.write.format('delta').mode('overwrite').save('s3://datalake/silver/customers')
# Gold: Агрегированные витрины
df_gold = df_silver.groupBy('country').agg(
count('customer_id').alias('customer_count'),
avg('lifetime_value').alias('avg_ltv')
)
df_gold.write.format('delta').mode('overwrite').save('s3://datalake/gold/customer_summary')
Сравнение подходов
| Характеристика | 3-слойная | Lakehouse (Bronze-Silver-Gold) |
|---|---|---|
| Сложность | Средняя | Высокая |
| Гибкость | Средняя | Высокая (ACID, time travel) |
| Storage formant | Parquet | Delta Lake / Iceberg |
| Schema evolution | Ограниченная | Встроенная |
| Compliance | Есть процесс очистки | Есть процесс очистки |
| Performance | Good | Excellent (Z-order, pruning) |
| Cost | Средние | Низкие (columnar format) |
Управление слоями в практике
# Пример скрипта управления lifecycle слоев
from datetime import datetime, timedelta
import boto3
s3 = boto3.client('s3')
def cleanup_old_staging_data(days=7):
"""Удалить данные staging старше N дней"""
cutoff_date = datetime.now() - timedelta(days=days)
response = s3.list_objects_v2(
Bucket='data-lake',
Prefix='staging/'
)
for obj in response.get('Contents', []):
if datetime.fromtimestamp(obj['LastModified'].timestamp()) < cutoff_date:
s3.delete_object(Bucket='data-lake', Key=obj['Key'])
def archive_old_mart_data(days=90):
"""Переместить витрины старше N дней в архив"""
# Переместить из горячего хранилища в холодное
pass
# Расписание в Airflow
cleanup_task = PythonOperator(
task_id='cleanup_old_staging',
python_callable=cleanup_old_staging_data,
schedule_interval='0 2 * * *' # Каждый день в 2:00
)
Заключение
Слои данных в хранилище служат разным целям:
- Raw/Staging: Сохранение исходных данных
- Processing: Очистка и подготовка
- Mart: Готовые витрины для бизнеса
Правильная архитектура слоев обеспечивает:
- Управляемость (понятная структура)
- Производительность (оптимизация на каждом уровне)
- Качество (контроль на каждом переходе)
- Масштабируемость (добавление новых слоев по необходимости)
- Переиспользование (staging → processing → multiple marts)
Выбор между традиционной 3-слойной и современной Lakehouse архитектурой зависит от требований и бюджета.