← Назад к вопросам

Как оргранизуешь хранилище данных?

2.2 Middle🔥 121 комментариев
#Архитектура и проектирование#Хранилища данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Организация хранилища данных (Data Warehouse Architecture)

Хранилище данных — это целостная система для сбора, хранения и анализа данных. Вот как я её организую на практике.

Архитектура: Слои (Layers)

Data Sources → Ingestion → Raw Layer → Processing → Warehouse → BI
              (ETL)      (Bronze)      (Silver)     (Gold)

1. Raw Layer (Bronze)

Копия исходных данных в их первоначальном виде:

-- Структура: raw_[source]_[table]
CREATE TABLE raw_analytics_users (
    id INT,
    name VARCHAR,
    email VARCHAR,
    created_at TIMESTAMP,
    raw_data JSONB,  -- весь payload
    ingested_at TIMESTAMP DEFAULT NOW(),
    source_partition_date DATE
);

-- Партиционирование по дате для быстрого удаления старых данных
PARTITION BY RANGE (source_partition_date);

Принципы:

  • Минимальная трансформация (только парсинг format)
  • Добавляем metadata: ingestion_time, source, version
  • Храним 30-90 дней (для recovery)

2. Processing Layer (Silver)

Очищенные и нормализованные данные:

-- Структура: stg_[domain]_[entity]
CREATE TABLE stg_users (
    user_id UUID PRIMARY KEY,
    user_name VARCHAR NOT NULL,
    email VARCHAR NOT NULL UNIQUE,
    country_code VARCHAR(2),
    created_at TIMESTAMP WITH TIME ZONE,
    updated_at TIMESTAMP WITH TIME ZONE,
    is_active BOOLEAN,
    dbt_updated_at TIMESTAMP,
    source VARCHAR  -- из какого источника
);

INDEX idx_created_at ON stg_users(created_at);
INDEX idx_country ON stg_users(country_code);

Трансформации:

  • Валидация данных
  • Устранение дубликатов
  • Type casting и нормализация
  • Handling NULL values
  • Connecting с reference data

3. Warehouse Layer (Gold)

Даже более высокоуровневые витрины для бизнеса:

-- Фактовая таблица
CREATE TABLE fact_orders (
    order_id UUID PRIMARY KEY,
    user_id UUID,
    product_id UUID,
    order_date DATE,
    amount DECIMAL(12, 2),
    currency VARCHAR(3),
    order_status VARCHAR,
    FOREIGN KEY (user_id) REFERENCES dim_users(user_id),
    FOREIGN KEY (product_id) REFERENCES dim_products(product_id)
);

-- Витрина для BI
CREATE TABLE v_daily_revenue (
    date DATE,
    country VARCHAR,
    product_category VARCHAR,
    revenue DECIMAL(15, 2),
    orders_count INT,
    avg_order_value DECIMAL(12, 2)
);

-- Materialized view для быстрого доступа
CREATE MATERIALIZED VIEW mv_user_metrics AS
SELECT 
    u.user_id,
    COUNT(o.order_id) as total_orders,
    SUM(o.amount) as ltv,
    AVG(o.amount) as avg_order_value,
    MAX(o.order_date) as last_purchase_date
FROM dim_users u
LEFT JOIN fact_orders o USING (user_id)
GROUP BY u.user_id;

Организация по доменам (Domain-Driven)

warehouse/
├── crm/
│   ├── stg_crm_customers.sql
│   ├── dim_customers.sql
│   └── fact_interactions.sql
├── finance/
│   ├── stg_finance_invoices.sql
│   ├── fact_transactions.sql
│   └── dim_accounts.sql
├── product/
│   ├── stg_product_catalog.sql
│   ├── dim_products.sql
│   └── fact_purchases.sql
└── marts/
    ├── v_customer_lifetime_value.sql
    ├── v_product_sales_trend.sql
    └── v_marketing_metrics.sql

ETL Pipeline (dbt)

# dbt/models/staging/crm/stg_crm_users.sql
{{ config(
    materialized='view',
    schema='staging',
    tags=['crm', 'daily']
) }}

WITH source_data AS (
    SELECT 
        id as user_id,
        name as user_name,
        email,
        country_code,
        created_date,
        updated_date
    FROM {{ source('crm', 'users') }}
    WHERE deleted_at IS NULL
),

validated_data AS (
    SELECT 
        *,
        CASE 
            WHEN email IS NULL THEN 0
            WHEN NOT email LIKE '%@%' THEN 0
            ELSE 1
        END as is_valid_email
    FROM source_data
)

SELECT * FROM validated_data

Партиционирование и индексы

-- Большие таблицы partitionируем по времени
CREATE TABLE fact_events (
    event_id UUID,
    user_id UUID,
    event_type VARCHAR,
    event_date DATE,
    event_timestamp TIMESTAMP WITH TIME ZONE,
    properties JSONB
)
PARTITION BY RANGE (event_date) (
    PARTITION p_2026_03 VALUES FROM ('2026-03-01') TO ('2026-04-01'),
    PARTITION p_2026_04 VALUES FROM ('2026-04-01') TO ('2026-05-01')
);

-- Индексы на часто используемые колонки
CREATE INDEX idx_events_user_date ON fact_events(user_id, event_date);
CREATE INDEX idx_events_type ON fact_events(event_type);

Обработка качества данных

# tests/data_quality.py
import pandas as pd
import sqlalchemy as sa

def test_no_duplicates():
    query = "SELECT user_id, COUNT(*) FROM stg_users GROUP BY user_id HAVING COUNT(*) > 1"
    result = pd.read_sql(query, engine)
    assert len(result) == 0, "Found duplicate users"

def test_no_null_in_key():
    query = "SELECT COUNT(*) FROM stg_users WHERE user_id IS NULL"
    result = pd.read_sql(query, engine)
    assert result.iloc[0, 0] == 0, "Null values in user_id"

def test_referential_integrity():
    query = """SELECT COUNT(*) FROM fact_orders o 
              WHERE NOT EXISTS (SELECT 1 FROM dim_users u WHERE u.user_id = o.user_id)"""
    result = pd.read_sql(query, engine)
    assert result.iloc[0, 0] == 0, "Orphaned orders found"

Мониторинг и поддержка

-- Таблица для отслеживания SLA
CREATE TABLE dw_refresh_log (
    table_name VARCHAR,
    refresh_started_at TIMESTAMP,
    refresh_completed_at TIMESTAMP,
    rows_loaded INT,
    refresh_status VARCHAR,  -- SUCCESS, FAILED
    error_message TEXT
);

-- Automated maintenance
ALTER TABLE fact_events SET (
    autovacuum_vacuum_scale_factor = 0.05,  -- более агрессивная очистка
    autovacuum_analyze_scale_factor = 0.02
);

-- Периодическая переиндексация
REINDEX TABLE fact_orders;

Лучшие практики

  1. Immutability: Raw и Silver слои не редактируются, только перезаписываются
  2. Lineage tracking: каждая таблица знает, откуда её данные
  3. Versioning: при изменении схемы создаём v2, v3
  4. SLA мониторинг: таблицы должны обновляться в оговоренное время
  5. Documentation: каждая таблица описана (назначение, частота обновления, владелец)

На практике

На моих проектах:

  • Raw Layer: хранит 60 дней, обновляется в real-time
  • Silver Layer: очищенные данные, 3 года
  • Gold Layer: витрины для BI, полная история
  • Volume: 10-500 TB в зависимости от компании
  • Update frequency: Real-time (Kafka), Hourly, или Daily

Вывод: хорошо организованное хранилище — это основа успешной аналитики. Слои, доменная организация, и качество данных — самое важное.