← Назад к вопросам
Как оргранизуешь хранилище данных?
2.2 Middle🔥 121 комментариев
#Архитектура и проектирование#Хранилища данных
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Организация хранилища данных (Data Warehouse Architecture)
Хранилище данных — это целостная система для сбора, хранения и анализа данных. Вот как я её организую на практике.
Архитектура: Слои (Layers)
Data Sources → Ingestion → Raw Layer → Processing → Warehouse → BI
(ETL) (Bronze) (Silver) (Gold)
1. Raw Layer (Bronze)
Копия исходных данных в их первоначальном виде:
-- Структура: raw_[source]_[table]
CREATE TABLE raw_analytics_users (
id INT,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP,
raw_data JSONB, -- весь payload
ingested_at TIMESTAMP DEFAULT NOW(),
source_partition_date DATE
);
-- Партиционирование по дате для быстрого удаления старых данных
PARTITION BY RANGE (source_partition_date);
Принципы:
- Минимальная трансформация (только парсинг format)
- Добавляем metadata: ingestion_time, source, version
- Храним 30-90 дней (для recovery)
2. Processing Layer (Silver)
Очищенные и нормализованные данные:
-- Структура: stg_[domain]_[entity]
CREATE TABLE stg_users (
user_id UUID PRIMARY KEY,
user_name VARCHAR NOT NULL,
email VARCHAR NOT NULL UNIQUE,
country_code VARCHAR(2),
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
is_active BOOLEAN,
dbt_updated_at TIMESTAMP,
source VARCHAR -- из какого источника
);
INDEX idx_created_at ON stg_users(created_at);
INDEX idx_country ON stg_users(country_code);
Трансформации:
- Валидация данных
- Устранение дубликатов
- Type casting и нормализация
- Handling NULL values
- Connecting с reference data
3. Warehouse Layer (Gold)
Даже более высокоуровневые витрины для бизнеса:
-- Фактовая таблица
CREATE TABLE fact_orders (
order_id UUID PRIMARY KEY,
user_id UUID,
product_id UUID,
order_date DATE,
amount DECIMAL(12, 2),
currency VARCHAR(3),
order_status VARCHAR,
FOREIGN KEY (user_id) REFERENCES dim_users(user_id),
FOREIGN KEY (product_id) REFERENCES dim_products(product_id)
);
-- Витрина для BI
CREATE TABLE v_daily_revenue (
date DATE,
country VARCHAR,
product_category VARCHAR,
revenue DECIMAL(15, 2),
orders_count INT,
avg_order_value DECIMAL(12, 2)
);
-- Materialized view для быстрого доступа
CREATE MATERIALIZED VIEW mv_user_metrics AS
SELECT
u.user_id,
COUNT(o.order_id) as total_orders,
SUM(o.amount) as ltv,
AVG(o.amount) as avg_order_value,
MAX(o.order_date) as last_purchase_date
FROM dim_users u
LEFT JOIN fact_orders o USING (user_id)
GROUP BY u.user_id;
Организация по доменам (Domain-Driven)
warehouse/
├── crm/
│ ├── stg_crm_customers.sql
│ ├── dim_customers.sql
│ └── fact_interactions.sql
├── finance/
│ ├── stg_finance_invoices.sql
│ ├── fact_transactions.sql
│ └── dim_accounts.sql
├── product/
│ ├── stg_product_catalog.sql
│ ├── dim_products.sql
│ └── fact_purchases.sql
└── marts/
├── v_customer_lifetime_value.sql
├── v_product_sales_trend.sql
└── v_marketing_metrics.sql
ETL Pipeline (dbt)
# dbt/models/staging/crm/stg_crm_users.sql
{{ config(
materialized='view',
schema='staging',
tags=['crm', 'daily']
) }}
WITH source_data AS (
SELECT
id as user_id,
name as user_name,
email,
country_code,
created_date,
updated_date
FROM {{ source('crm', 'users') }}
WHERE deleted_at IS NULL
),
validated_data AS (
SELECT
*,
CASE
WHEN email IS NULL THEN 0
WHEN NOT email LIKE '%@%' THEN 0
ELSE 1
END as is_valid_email
FROM source_data
)
SELECT * FROM validated_data
Партиционирование и индексы
-- Большие таблицы partitionируем по времени
CREATE TABLE fact_events (
event_id UUID,
user_id UUID,
event_type VARCHAR,
event_date DATE,
event_timestamp TIMESTAMP WITH TIME ZONE,
properties JSONB
)
PARTITION BY RANGE (event_date) (
PARTITION p_2026_03 VALUES FROM ('2026-03-01') TO ('2026-04-01'),
PARTITION p_2026_04 VALUES FROM ('2026-04-01') TO ('2026-05-01')
);
-- Индексы на часто используемые колонки
CREATE INDEX idx_events_user_date ON fact_events(user_id, event_date);
CREATE INDEX idx_events_type ON fact_events(event_type);
Обработка качества данных
# tests/data_quality.py
import pandas as pd
import sqlalchemy as sa
def test_no_duplicates():
query = "SELECT user_id, COUNT(*) FROM stg_users GROUP BY user_id HAVING COUNT(*) > 1"
result = pd.read_sql(query, engine)
assert len(result) == 0, "Found duplicate users"
def test_no_null_in_key():
query = "SELECT COUNT(*) FROM stg_users WHERE user_id IS NULL"
result = pd.read_sql(query, engine)
assert result.iloc[0, 0] == 0, "Null values in user_id"
def test_referential_integrity():
query = """SELECT COUNT(*) FROM fact_orders o
WHERE NOT EXISTS (SELECT 1 FROM dim_users u WHERE u.user_id = o.user_id)"""
result = pd.read_sql(query, engine)
assert result.iloc[0, 0] == 0, "Orphaned orders found"
Мониторинг и поддержка
-- Таблица для отслеживания SLA
CREATE TABLE dw_refresh_log (
table_name VARCHAR,
refresh_started_at TIMESTAMP,
refresh_completed_at TIMESTAMP,
rows_loaded INT,
refresh_status VARCHAR, -- SUCCESS, FAILED
error_message TEXT
);
-- Automated maintenance
ALTER TABLE fact_events SET (
autovacuum_vacuum_scale_factor = 0.05, -- более агрессивная очистка
autovacuum_analyze_scale_factor = 0.02
);
-- Периодическая переиндексация
REINDEX TABLE fact_orders;
Лучшие практики
- Immutability: Raw и Silver слои не редактируются, только перезаписываются
- Lineage tracking: каждая таблица знает, откуда её данные
- Versioning: при изменении схемы создаём v2, v3
- SLA мониторинг: таблицы должны обновляться в оговоренное время
- Documentation: каждая таблица описана (назначение, частота обновления, владелец)
На практике
На моих проектах:
- Raw Layer: хранит 60 дней, обновляется в real-time
- Silver Layer: очищенные данные, 3 года
- Gold Layer: витрины для BI, полная история
- Volume: 10-500 TB в зависимости от компании
- Update frequency: Real-time (Kafka), Hourly, или Daily
Вывод: хорошо организованное хранилище — это основа успешной аналитики. Слои, доменная организация, и качество данных — самое важное.