← Назад к вопросам

Какие слои данных были в хранилище?

2.0 Middle🔥 181 комментариев
#Архитектура и проектирование#Хранилища данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Слои данных в хранилище (Data Warehouse Architecture)

Обзор многослойной архитектуры

Data Warehouse строится на основе многослойной архитектуры, где каждый слой имеет конкретное назначение. Это обеспечивает организацию данных, управляемость, качество и производительность системы.

Классическая архитектура: 3 слоя

┌──────────────────────────────────────────────────────────┐
│         External Data Sources (API, ERP, CRM)            │
└───────────────────────┬──────────────────────────────────┘
                        │
                        ▼
┌──────────────────────────────────────────────────────────┐
│ 1. Raw / Staging Layer (Неочищенные данные)              │
│    └── Копия источников данных как есть                  │
└───────────────────────┬──────────────────────────────────┘
                        │
                        ▼
┌──────────────────────────────────────────────────────────┐
│ 2. Processing / Integration Layer (Трансформированные)   │
│    └── Чистые, денормализованные данные                  │
└───────────────────────┬──────────────────────────────────┘
                        │
                        ▼
┌──────────────────────────────────────────────────────────┐
│ 3. Mart / Presentation Layer (Витрины для аналитики)     │
│    └── Агрегированные данные для конечных пользователей  │
└───────────────────────┬──────────────────────────────────┘
                        │
             ┌──────────┴──────────┐
             ▼                     ▼
        BI Tools              Python Analytics
        Tableau              Data Science
        Power BI             ML Models

Слой 1: Raw / Staging Layer

Назначение: Копировать данные из источников без изменений

Характеристики:

  • Структура 1:1 с исходной БД
  • Нет трансформаций
  • Нет очистки
  • Часто неполные или дублирующиеся данные
  • Быстрая загрузка (bulk insert)

Структура в HDFS/S3:

/warehouse/staging/
├── customers/
│   ├── year=2023/month=01/
│   │   └── part-*.parquet
│   └── year=2024/month=01/
│       └── part-*.parquet
├── orders/
├── products/
└── transactions/

Пример SQL схемы:

CREATE SCHEMA staging;

-- Копия таблицы customers из источника
CREATE TABLE staging.customers (
    customer_id INT,
    customer_name VARCHAR(255),
    email VARCHAR(255),
    phone VARCHAR(20),
    address VARCHAR(500),
    country VARCHAR(50),
    load_timestamp TIMESTAMP,
    source_system VARCHAR(50)
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;

-- Копия таблицы orders
CREATE TABLE staging.orders (
    order_id INT,
    customer_id INT,
    order_date DATE,
    amount DECIMAL(10,2),
    status VARCHAR(50),
    load_timestamp TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;

ETL для staging:

from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.appName('staging_load').getOrCreate()

# Чтение из источника (напрямую из БД)
df = spark.read \
    .format('jdbc') \
    .option('url', 'jdbc:postgresql://source_db:5432/prod') \
    .option('dbtable', 'public.customers') \
    .option('user', 'user') \
    .option('password', 'pass') \
    .load()

# Добавить load timestamp
from pyspark.sql.functions import current_timestamp, year, month

df_with_metadata = df.withColumn(
    'load_timestamp', current_timestamp()
).withColumn(
    'year', year(current_timestamp())
).withColumn(
    'month', month(current_timestamp())
)

# Написать в staging (append mode)
df_with_metadata.write \
    .mode('append') \
    .partitionBy('year', 'month') \
    .parquet('s3://data-lake/staging/customers')

Слой 2: Processing / Integration Layer

Назначение: Очистить, денормализовать и подготовить данные к аналитике

Что происходит:

  • Удаление дубликатов
  • Очистка (trim, null handling, type casting)
  • Соединение таблиц (join)
  • Расчёт производных полей
  • Установка бизнес-правил
  • Медленное измерение (slowly changing dimensions)

Структура в HDFS/S3:

/warehouse/processing/
├── dim_customers/  # Dimension таблица
│   └── part-*.parquet
├── dim_products/   # Dimension таблица
│   └── part-*.parquet
├── dim_dates/      # Date dimension
│   └── part-*.parquet
├── fct_orders/     # Fact таблица
│   └── part-*.parquet
└── fct_transactions/
    └── part-*.parquet

Пример: Создание Dimension таблицы (SCD Type 2)

-- Processing слой: Dimension таблица customers (SCD Type 2)
CREATE TABLE processing.dim_customers (
    customer_sk INT,
    customer_id INT,
    customer_name VARCHAR(255),
    email VARCHAR(255),
    phone VARCHAR(20),
    address VARCHAR(500),
    country VARCHAR(50),
    segment VARCHAR(50),  -- Вычисленное поле
    effective_date DATE,
    end_date DATE,
    is_current BOOLEAN,
    created_timestamp TIMESTAMP,
    updated_timestamp TIMESTAMP
)
STORED AS PARQUET;

-- Пример заполнения (медленное измерение)
WITH new_data AS (
    SELECT 
        ROW_NUMBER() OVER (ORDER BY customer_id) as customer_sk,
        s.customer_id,
        s.customer_name,
        s.email,
        s.phone,
        s.address,
        s.country,
        CASE 
            WHEN amount_total > 100000 THEN 'VIP'
            WHEN amount_total > 10000 THEN 'Premium'
            ELSE 'Standard'
        END as segment,
        CURRENT_DATE as effective_date,
        NULL::DATE as end_date,
        TRUE as is_current
    FROM staging.customers s
    LEFT JOIN fct_orders o ON s.customer_id = o.customer_id
    GROUP BY s.customer_id, s.customer_name
)
INSERT INTO processing.dim_customers
SELECT * FROM new_data;

Пример: Создание Fact таблицы

-- Fact таблица orders (детальные транзакции)
CREATE TABLE processing.fct_orders (
    order_sk INT,
    order_id INT,
    customer_sk INT,
    product_sk INT,
    order_date DATE,
    amount DECIMAL(10,2),
    quantity INT,
    discount DECIMAL(10,2),
    net_amount DECIMAL(10,2),  -- amount - discount
    status VARCHAR(50),
    created_timestamp TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;

ETL для processing (dbt пример):

-- models/processing/dim_customers.sql
{{ config(
    materialized='table',
    schema='processing',
    tags=['critical']
) }}

WITH source_data AS (
    SELECT 
        customer_id,
        customer_name,
        email,
        phone,
        address,
        country
    FROM {{ source('staging', 'customers') }}
),

agg_orders AS (
    SELECT
        customer_id,
        COUNT(DISTINCT order_id) as total_orders,
        SUM(amount) as total_spent
    FROM {{ source('staging', 'orders') }}
    GROUP BY customer_id
),

final AS (
    SELECT
        {{ dbt_utils.surrogate_key(['s.customer_id']) }} as customer_sk,
        s.customer_id,
        s.customer_name,
        s.email,
        s.phone,
        s.address,
        s.country,
        CASE 
            WHEN COALESCE(o.total_spent, 0) > 100000 THEN 'VIP'
            WHEN COALESCE(o.total_spent, 0) > 10000 THEN 'Premium'
            ELSE 'Standard'
        END as segment,
        CURRENT_DATE as effective_date,
        NULL as end_date,
        TRUE as is_current,
        CURRENT_TIMESTAMP as created_timestamp
    FROM source_data s
    LEFT JOIN agg_orders o USING (customer_id)
)

SELECT * FROM final

Слой 3: Mart / Presentation Layer

Назначение: Предоставить готовые данные для конечных пользователей и BI инструментов

Характеристики:

  • Агрегированные данные
  • Полностью денормализованные (широкие таблицы)
  • Оптимизированы для específicных use cases
  • Быстрые запросы (< 5 секунд)
  • Простая навигация для бизнес-пользователей

Структура витрин:

/warehouse/marts/
├── sales/
│   ├── sales_daily_summary/    # Агрегированные продажи
│   ├── sales_by_region/        # По регионам
│   └── sales_by_product/       # По продуктам
├── customer/
│   ├── customer_lifetime_value/ # CLV для каждого клиента
│   ├── customer_segments/
│   └── customer_churn_risk/
├── financial/
│   ├── revenue_forecast/
│   └── margin_analysis/
└── operations/
    ├── inventory_levels/
    └── shipment_tracking/

Пример: Витрина продаж по дням

-- Витрина: Ежедневные продажи по регионам и продукты
CREATE TABLE marts.sales_daily_summary (
    sale_date DATE,
    region VARCHAR(100),
    product_category VARCHAR(100),
    total_orders INT,
    total_quantity INT,
    total_revenue DECIMAL(15,2),
    total_discount DECIMAL(15,2),
    avg_order_value DECIMAL(10,2),
    customer_count INT,
    repeat_customer_count INT,
    new_customer_count INT
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;

-- Заполнение витрины
INSERT INTO marts.sales_daily_summary
SELECT
    o.order_date as sale_date,
    c.country as region,
    p.category as product_category,
    COUNT(DISTINCT o.order_id) as total_orders,
    SUM(o.quantity) as total_quantity,
    SUM(o.net_amount) as total_revenue,
    SUM(o.discount) as total_discount,
    AVG(o.net_amount) as avg_order_value,
    COUNT(DISTINCT c.customer_sk) as customer_count,
    COUNT(DISTINCT CASE WHEN o.is_repeat=TRUE THEN c.customer_sk END) as repeat_customer_count,
    COUNT(DISTINCT CASE WHEN o.is_new=TRUE THEN c.customer_sk END) as new_customer_count
FROM processing.fct_orders o
JOIN processing.dim_customers c USING (customer_sk)
JOIN processing.dim_products p USING (product_sk)
GROUP BY o.order_date, c.country, p.category;

Пример витрины для конечного пользователя (широкая таблица):

-- Витрина: Клиент + его статистика (для BI инструмента)
CREATE TABLE marts.customer_360 (
    customer_id INT,
    customer_name VARCHAR(255),
    email VARCHAR(255),
    country VARCHAR(100),
    segment VARCHAR(50),
    lifecycle_stage VARCHAR(50),
    total_orders INT,
    total_spent DECIMAL(15,2),
    avg_order_value DECIMAL(10,2),
    first_order_date DATE,
    last_order_date DATE,
    days_since_last_order INT,
    churn_risk FLOAT,
    ltv_predicted DECIMAL(15,2)
)
STORED AS PARQUET;

Современная архитектура: Lakehouse (Data Lake + Warehouse)

┌─────────────────────────────────────────────────────────┐
│ External Data Sources                                   │
└───────────────────────┬─────────────────────────────────┘
                        │
            ┌───────────┴───────────┐
            ▼                       ▼
    ┌──────────────────┐   ┌─────────────────┐
    │ Raw Data Lake    │   │ Real-time       │
    │ (HDFS/S3)        │   │ Streaming       │
    │ (Format: Parquet)│   │ (Kafka/Pulsar)  │
    └───────┬──────────┘   └────────┬────────┘
            │                       │
            └───────────┬───────────┘
                        │
            ┌───────────▼────────────┐
            │ Bronze Layer           │
            │ (Raw, unprocessed)     │
            │ (Delta Lake / Iceberg) │
            └───────────┬────────────┘
                        │
            ┌───────────▼────────────┐
            │ Silver Layer           │
            │ (Cleaned, integrated)  │
            │ (Delta Lake / Iceberg) │
            └───────────┬────────────┘
                        │
            ┌───────────▼────────────┐
            │ Gold Layer             │
            │ (Aggregated, business) │
            │ (Delta Lake / Iceberg) │
            └───────────┬────────────┘
                        │
         ┌──────────────┼──────────────┐
         ▼              ▼              ▼
     SQL Query      BI Tools       Python ML
     Analytics      Tableau        Models

Современный подход: Bronze-Silver-Gold

# Bronze: Raw данные
df_bronze = spark.read.parquet('s3://datalake/bronze/customers')

# Silver: Очищенные данные
df_silver = df_bronze \
    .filter(col('customer_id').isNotNull()) \
    .withColumn('email', lower(col('email'))) \
    .dropDuplicates(['customer_id'])

df_silver.write.format('delta').mode('overwrite').save('s3://datalake/silver/customers')

# Gold: Агрегированные витрины
df_gold = df_silver.groupBy('country').agg(
    count('customer_id').alias('customer_count'),
    avg('lifetime_value').alias('avg_ltv')
)

df_gold.write.format('delta').mode('overwrite').save('s3://datalake/gold/customer_summary')

Сравнение подходов

Характеристика3-слойнаяLakehouse (Bronze-Silver-Gold)
СложностьСредняяВысокая
ГибкостьСредняяВысокая (ACID, time travel)
Storage formantParquetDelta Lake / Iceberg
Schema evolutionОграниченнаяВстроенная
ComplianceЕсть процесс очисткиЕсть процесс очистки
PerformanceGoodExcellent (Z-order, pruning)
CostСредниеНизкие (columnar format)

Управление слоями в практике

# Пример скрипта управления lifecycle слоев
from datetime import datetime, timedelta
import boto3

s3 = boto3.client('s3')

def cleanup_old_staging_data(days=7):
    """Удалить данные staging старше N дней"""
    cutoff_date = datetime.now() - timedelta(days=days)
    
    response = s3.list_objects_v2(
        Bucket='data-lake',
        Prefix='staging/'
    )
    
    for obj in response.get('Contents', []):
        if datetime.fromtimestamp(obj['LastModified'].timestamp()) < cutoff_date:
            s3.delete_object(Bucket='data-lake', Key=obj['Key'])

def archive_old_mart_data(days=90):
    """Переместить витрины старше N дней в архив"""
    # Переместить из горячего хранилища в холодное
    pass

# Расписание в Airflow
cleanup_task = PythonOperator(
    task_id='cleanup_old_staging',
    python_callable=cleanup_old_staging_data,
    schedule_interval='0 2 * * *'  # Каждый день в 2:00
)

Заключение

Слои данных в хранилище служат разным целям:

  1. Raw/Staging: Сохранение исходных данных
  2. Processing: Очистка и подготовка
  3. Mart: Готовые витрины для бизнеса

Правильная архитектура слоев обеспечивает:

  • Управляемость (понятная структура)
  • Производительность (оптимизация на каждом уровне)
  • Качество (контроль на каждом переходе)
  • Масштабируемость (добавление новых слоев по необходимости)
  • Переиспользование (staging → processing → multiple marts)

Выбор между традиционной 3-слойной и современной Lakehouse архитектурой зависит от требований и бюджета.