← Назад к вопросам

Какие слои данных есть в хранилище?

2.0 Middle🔥 221 комментариев
#SQL и базы данных#Хранилища данных и ETL

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Слои данных в хранилище (Data Warehouse Layers)

Данные в аналитическом хранилище организуются в слои для обеспечения качества, производительности и удобства анализа. Это одна из ключевых архитектурных концепций в аналитике данных. Разберу стандартные слои и их назначение.

Архитектура Data Warehouse (3-4 слойная архитектура)

┌─────────────────────────────────┐
│   Presentation Layer (Витрины)  │  <- Дашборды, отчёты, BI инструменты
├─────────────────────────────────┤
│  Mart Layer (Витрины данных)    │  <- Готовые таблицы для аналитики
├─────────────────────────────────┤
│  Core / Processing Layer        │  <- Преобразование и агрегация
├─────────────────────────────────┤
│  Staging Layer (Промежуточный)  │  <- Очистка и трансформация
├─────────────────────────────────┤
│  Raw / Source Layer             │  <- Исходные данные без обработки
├─────────────────────────────────┤
│  Source Systems (ERP, CRM, etc) │  <- Системы-источники
└─────────────────────────────────┘

1. Raw Layer (Слой сырых данных)

Назначение: Сохранить исходные данные "как есть" без трансформаций.

Характеристики:

  • Копия данных прямо из источников
  • Без валидации и очистки
  • Полная история изменений (важно для аудита)
  • Хранятся в неизменяемом виде

Структура:

-- Пример таблиц в Raw Layer
CREATE TABLE raw.events (
    id UUID,
    user_id VARCHAR,
    event_name VARCHAR,
    event_data JSONB,
    created_at TIMESTAMP,
    _loaded_at TIMESTAMP,
    _source_system VARCHAR
);

CREATE TABLE raw.users (
    user_id VARCHAR,
    email VARCHAR,
    name VARCHAR,
    registration_date DATE,
    _extracted_at TIMESTAMP
);

Практический пример ETL скрипта:

import pandas as pd
from sqlalchemy import create_engine

# Читаем из источника (например, API)
response = requests.get('https://api.example.com/events')
raw_data = response.json()

# Сохраняем в raw слой БЕЗ трансформаций
engine = create_engine('postgresql://....')
df_raw = pd.DataFrame(raw_data)

df_raw.to_sql('raw_events', 
              con=engine, 
              schema='raw',
              if_exists='append',
              index=False)

Преимущества:

  • ✅ Полная история
  • ✅ Возможность переделать обработку
  • ✅ Аудит и отчетность

Недостатки:

  • ❌ Требует много места
  • ❌ Медленнее для аналитики (много шума)

2. Staging Layer (Промежуточный слой)

Назначение: Очистить и подготовить данные для дальнейшей обработки.

Процессы:

  • Валидация (проверка формата, диапазонов)
  • Деденьу плицирование (удаление дубликатов)
  • Обработка пропусков (NULL значения)
  • Нормализация (единица измерения, кодировка)
  • Фильтрация (удаление явно неправильных данных)
-- Пример трансформации в Staging
CREATE TABLE staging.events AS
SELECT 
    id,
    user_id,
    event_name,
    -- Валидация
    CASE 
        WHEN event_name IS NULL THEN 'unknown'
        ELSE LOWER(event_name)
    END as event_name_clean,
    event_data,
    -- Стандартизация даты
    DATE(created_at) as event_date,
    EXTRACT(HOUR FROM created_at) as event_hour,
    _loaded_at,
    _source_system
FROM raw.events
WHERE 
    -- Фильтруем явный мусор
    user_id IS NOT NULL
    AND created_at > '2020-01-01'
    AND created_at < CURRENT_TIMESTAMP
DISTINCT ON (id, user_id, created_at);  -- Дедубликация

Python пример:

import pandas as pd
import numpy as np

def clean_events(df_raw):
    """Очистка событий в staging слой"""
    df = df_raw.copy()
    
    # Удаляем явные дубликаты
    df = df.drop_duplicates(subset=['id'])
    
    # Валидация user_id
    df = df[df['user_id'].notna()]
    
    # Нормализуем event_name
    df['event_name'] = df['event_name'].str.lower().str.strip()
    
    # Обработка дат
    df['created_at'] = pd.to_datetime(df['created_at'])
    df = df[df['created_at'] > '2020-01-01']
    
    # Добавляем вспомогательные столбцы
    df['event_date'] = df['created_at'].dt.date
    df['event_hour'] = df['created_at'].dt.hour
    
    # Логирование качества
    print(f"Осталось строк: {len(df)} из {len(df_raw)}")
    print(f"Пропущенных user_id: {df_raw['user_id'].isna().sum()}")
    
    return df

3. Core / Processing Layer (Основной слой обработки)

Назначение: Создать бизнес-логику, измерения и факты.

Это часто использует подход Data Vault или Star Schema.

-- Пример: Таблица фактов (Facts Table)
CREATE TABLE core.fact_events (
    event_id UUID PRIMARY KEY,
    user_key INT,           -- Ссылка на справочник
    event_type_key INT,     -- Ссылка на справочник
    product_key INT,        -- Ссылка на справочник
    event_date DATE,
    event_time TIME,
    amount NUMERIC,
    quantity INT,
    revenue NUMERIC,
    _loaded_at TIMESTAMP
);

-- Примеры таблиц измерений (Dimension Tables)
CREATE TABLE core.dim_users (
    user_key INT PRIMARY KEY,
    user_id VARCHAR,        -- Естественный ключ
    user_name VARCHAR,
    registration_date DATE,
    user_status VARCHAR,
    _effective_from DATE,
    _effective_to DATE,
    _is_current BOOLEAN
);

CREATE TABLE core.dim_event_types (
    event_type_key INT PRIMARY KEY,
    event_type_name VARCHAR,
    event_category VARCHAR,
    _loaded_at TIMESTAMP
);

Логика в Core слое:

-- Пример: Вычисление метрик по пользователям
CREATE TABLE core.user_metrics AS
SELECT 
    du.user_key,
    du.user_id,
    COUNT(DISTINCT fe.event_id) as total_events,
    COUNT(DISTINCT CASE WHEN fe.event_type_key = 5 THEN fe.event_id END) as purchase_count,
    SUM(fe.revenue) as total_revenue,
    AVG(fe.amount) as avg_order_value,
    MAX(fe.event_date) as last_event_date,
    DATEDIFF(DAY, MIN(fe.event_date), MAX(fe.event_date)) as customer_lifetime_days
FROM core.fact_events fe
JOIN core.dim_users du ON fe.user_key = du.user_key
WHERE du._is_current = TRUE
GROUP BY du.user_key, du.user_id;

4. Mart / Data Mart Layer (Витрины данных)

Назначение: Подготовить данные для конкретных случаев использования (для разных типов аналитиков).

Есть витрины для разных направлений:

  • Sales Mart (для продаж)
  • Marketing Mart (для маркетинга)
  • Finance Mart (для финансов)
  • Customer Analytics Mart
-- Пример: Sales Mart
CREATE TABLE mart.sales_daily (
    date DATE,
    product_id VARCHAR,
    product_name VARCHAR,
    sales_qty INT,
    sales_amount NUMERIC,
    revenue NUMERIC,
    profit NUMERIC,
    customer_count INT,
    avg_order_value NUMERIC,
    region VARCHAR,
    _loaded_at TIMESTAMP
);

-- Пример: Marketing Mart
CREATE TABLE mart.marketing_funnel (
    date DATE,
    campaign_id VARCHAR,
    campaign_name VARCHAR,
    impressions INT,
    clicks INT,
    click_through_rate NUMERIC,
    conversions INT,
    conversion_rate NUMERIC,
    cost NUMERIC,
    revenue NUMERIC,
    roi NUMERIC,
    _loaded_at TIMESTAMP
);

Обслуживание витрин:

# Ежедневное обновление Sales Mart
def update_sales_mart():
    query = """
    INSERT INTO mart.sales_daily
    SELECT 
        CURRENT_DATE,
        p.product_id,
        p.product_name,
        SUM(fe.quantity),
        SUM(fe.amount),
        SUM(fe.revenue),
        SUM(fe.revenue - fe.cost),
        COUNT(DISTINCT fe.user_key),
        AVG(fe.amount),
        dc.region,
        CURRENT_TIMESTAMP
    FROM core.fact_events fe
    JOIN core.dim_products p ON fe.product_key = p.product_key
    JOIN core.dim_customers dc ON fe.user_key = dc.user_key
    WHERE DATE(fe.event_date) = CURRENT_DATE
    GROUP BY 2, 3, 9, 10
    """
    execute_query(query)

5. Presentation Layer (Слой представления)

Назначение: Прямой доступ для бизнес-пользователей (через BI инструменты).

-- Пример: Представления для дашбордов
CREATE VIEW presentation.dashboard_sales_overview AS
SELECT 
    sd.date,
    sd.product_name,
    sd.sales_qty,
    sd.revenue,
    sd.customer_count,
    sd.revenue / NULLIF(sd.customer_count, 0) as revenue_per_customer,
    LAG(sd.revenue) OVER (PARTITION BY sd.product_id ORDER BY sd.date) as prev_day_revenue,
    (sd.revenue - LAG(sd.revenue) OVER (PARTITION BY sd.product_id ORDER BY sd.date)) / 
        LAG(sd.revenue) OVER (PARTITION BY sd.product_id ORDER BY sd.date) as day_over_day_growth
FROM mart.sales_daily sd
WHERE sd.date >= CURRENT_DATE - INTERVAL '90 days';

Таблица слоёв

СлойНазначениеПримеры таблицОбновл.Пользователи
RawСохранить исходные данныеraw_events, raw_usersПостоянноNone (только инженеры)
StagingОчистка и валидацияstaging_clean_eventsЕжедневноИнженеры данных
CoreБизнес-логикаfact_events, dim_usersЕжедневноАналитики, разработчики ML
MartГотовые витриныsales_daily, marketing_funnelЕжедневноБизнес-аналитики
PresentationBI инструментыviews для дашбордовЕжедневноБизнес-пользователи

Практический пример: От Raw до Presentation

# 1. Raw: Данные из API
def load_raw_orders():
    api_data = requests.get('https://api/orders').json()
    df = pd.DataFrame(api_data)
    df.to_sql('raw_orders', schema='raw', ...)

# 2. Staging: Очистка
def clean_to_staging():
    df = pd.read_sql('SELECT * FROM raw.orders', con)
    df = df.drop_duplicates()
    df['order_date'] = pd.to_datetime(df['order_date'])
    df.to_sql('staging_orders', schema='staging', ...)

# 3. Core: Трансформация в факты
def build_core_facts():
    query = """
    INSERT INTO core.fact_orders
    SELECT 
        so.order_id,
        du.user_key,
        so.product_id,
        so.order_date,
        so.amount,
        so.cost,
        so.amount - so.cost as profit
    FROM staging.orders so
    JOIN core.dim_users du ON so.user_id = du.user_id
    """
    execute_query(query)

# 4. Mart: Витрины для аналитики
def build_sales_mart():
    query = """
    INSERT INTO mart.sales_daily
    SELECT 
        DATE(fo.order_date),
        COUNT(*) as orders,
        SUM(fo.amount) as revenue,
        SUM(fo.profit) as profit
    FROM core.fact_orders fo
    GROUP BY 1
    """
    execute_query(query)

# 5. Presentation: Для дашбордов
def create_presentation():
    query = """
    CREATE OR REPLACE VIEW presentation.sales_summary AS
    SELECT 
        sd.date,
        sd.orders,
        sd.revenue,
        SUM(sd.revenue) OVER (ORDER BY sd.date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as revenue_7day_ma
    FROM mart.sales_daily sd
    """
    execute_query(query)

Ключевые принципы

Разделение ответственности: Каждый слой имеет одну задачу

Data Lineage: Можно отследить данные от источника до дашборда

Масштабируемость: Легко добавлять новые витрины без переделки ядра

Quality Gates: Качество улучшается на каждом слое

Версионирование: Можно держать несколько версий бизнес-логики

Инструменты для реализации

  • dbt (Data Build Tool): управление трансформацией
  • Apache Airflow: оркестровка ETL
  • Dataflow / Spark: обработка больших объёмов
  • PostgreSQL / Snowflake: хранение
  • Tableau / Looker: BI инструменты
Какие слои данных есть в хранилище? | PrepBro