Какие слои данных есть в хранилище?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Слои данных в хранилище (Data Warehouse Layers)
Данные в аналитическом хранилище организуются в слои для обеспечения качества, производительности и удобства анализа. Это одна из ключевых архитектурных концепций в аналитике данных. Разберу стандартные слои и их назначение.
Архитектура Data Warehouse (3-4 слойная архитектура)
┌─────────────────────────────────┐
│ Presentation Layer (Витрины) │ <- Дашборды, отчёты, BI инструменты
├─────────────────────────────────┤
│ Mart Layer (Витрины данных) │ <- Готовые таблицы для аналитики
├─────────────────────────────────┤
│ Core / Processing Layer │ <- Преобразование и агрегация
├─────────────────────────────────┤
│ Staging Layer (Промежуточный) │ <- Очистка и трансформация
├─────────────────────────────────┤
│ Raw / Source Layer │ <- Исходные данные без обработки
├─────────────────────────────────┤
│ Source Systems (ERP, CRM, etc) │ <- Системы-источники
└─────────────────────────────────┘
1. Raw Layer (Слой сырых данных)
Назначение: Сохранить исходные данные "как есть" без трансформаций.
Характеристики:
- Копия данных прямо из источников
- Без валидации и очистки
- Полная история изменений (важно для аудита)
- Хранятся в неизменяемом виде
Структура:
-- Пример таблиц в Raw Layer
CREATE TABLE raw.events (
id UUID,
user_id VARCHAR,
event_name VARCHAR,
event_data JSONB,
created_at TIMESTAMP,
_loaded_at TIMESTAMP,
_source_system VARCHAR
);
CREATE TABLE raw.users (
user_id VARCHAR,
email VARCHAR,
name VARCHAR,
registration_date DATE,
_extracted_at TIMESTAMP
);
Практический пример ETL скрипта:
import pandas as pd
from sqlalchemy import create_engine
# Читаем из источника (например, API)
response = requests.get('https://api.example.com/events')
raw_data = response.json()
# Сохраняем в raw слой БЕЗ трансформаций
engine = create_engine('postgresql://....')
df_raw = pd.DataFrame(raw_data)
df_raw.to_sql('raw_events',
con=engine,
schema='raw',
if_exists='append',
index=False)
Преимущества:
- ✅ Полная история
- ✅ Возможность переделать обработку
- ✅ Аудит и отчетность
Недостатки:
- ❌ Требует много места
- ❌ Медленнее для аналитики (много шума)
2. Staging Layer (Промежуточный слой)
Назначение: Очистить и подготовить данные для дальнейшей обработки.
Процессы:
- Валидация (проверка формата, диапазонов)
- Деденьу плицирование (удаление дубликатов)
- Обработка пропусков (NULL значения)
- Нормализация (единица измерения, кодировка)
- Фильтрация (удаление явно неправильных данных)
-- Пример трансформации в Staging
CREATE TABLE staging.events AS
SELECT
id,
user_id,
event_name,
-- Валидация
CASE
WHEN event_name IS NULL THEN 'unknown'
ELSE LOWER(event_name)
END as event_name_clean,
event_data,
-- Стандартизация даты
DATE(created_at) as event_date,
EXTRACT(HOUR FROM created_at) as event_hour,
_loaded_at,
_source_system
FROM raw.events
WHERE
-- Фильтруем явный мусор
user_id IS NOT NULL
AND created_at > '2020-01-01'
AND created_at < CURRENT_TIMESTAMP
DISTINCT ON (id, user_id, created_at); -- Дедубликация
Python пример:
import pandas as pd
import numpy as np
def clean_events(df_raw):
"""Очистка событий в staging слой"""
df = df_raw.copy()
# Удаляем явные дубликаты
df = df.drop_duplicates(subset=['id'])
# Валидация user_id
df = df[df['user_id'].notna()]
# Нормализуем event_name
df['event_name'] = df['event_name'].str.lower().str.strip()
# Обработка дат
df['created_at'] = pd.to_datetime(df['created_at'])
df = df[df['created_at'] > '2020-01-01']
# Добавляем вспомогательные столбцы
df['event_date'] = df['created_at'].dt.date
df['event_hour'] = df['created_at'].dt.hour
# Логирование качества
print(f"Осталось строк: {len(df)} из {len(df_raw)}")
print(f"Пропущенных user_id: {df_raw['user_id'].isna().sum()}")
return df
3. Core / Processing Layer (Основной слой обработки)
Назначение: Создать бизнес-логику, измерения и факты.
Это часто использует подход Data Vault или Star Schema.
-- Пример: Таблица фактов (Facts Table)
CREATE TABLE core.fact_events (
event_id UUID PRIMARY KEY,
user_key INT, -- Ссылка на справочник
event_type_key INT, -- Ссылка на справочник
product_key INT, -- Ссылка на справочник
event_date DATE,
event_time TIME,
amount NUMERIC,
quantity INT,
revenue NUMERIC,
_loaded_at TIMESTAMP
);
-- Примеры таблиц измерений (Dimension Tables)
CREATE TABLE core.dim_users (
user_key INT PRIMARY KEY,
user_id VARCHAR, -- Естественный ключ
user_name VARCHAR,
registration_date DATE,
user_status VARCHAR,
_effective_from DATE,
_effective_to DATE,
_is_current BOOLEAN
);
CREATE TABLE core.dim_event_types (
event_type_key INT PRIMARY KEY,
event_type_name VARCHAR,
event_category VARCHAR,
_loaded_at TIMESTAMP
);
Логика в Core слое:
-- Пример: Вычисление метрик по пользователям
CREATE TABLE core.user_metrics AS
SELECT
du.user_key,
du.user_id,
COUNT(DISTINCT fe.event_id) as total_events,
COUNT(DISTINCT CASE WHEN fe.event_type_key = 5 THEN fe.event_id END) as purchase_count,
SUM(fe.revenue) as total_revenue,
AVG(fe.amount) as avg_order_value,
MAX(fe.event_date) as last_event_date,
DATEDIFF(DAY, MIN(fe.event_date), MAX(fe.event_date)) as customer_lifetime_days
FROM core.fact_events fe
JOIN core.dim_users du ON fe.user_key = du.user_key
WHERE du._is_current = TRUE
GROUP BY du.user_key, du.user_id;
4. Mart / Data Mart Layer (Витрины данных)
Назначение: Подготовить данные для конкретных случаев использования (для разных типов аналитиков).
Есть витрины для разных направлений:
- Sales Mart (для продаж)
- Marketing Mart (для маркетинга)
- Finance Mart (для финансов)
- Customer Analytics Mart
-- Пример: Sales Mart
CREATE TABLE mart.sales_daily (
date DATE,
product_id VARCHAR,
product_name VARCHAR,
sales_qty INT,
sales_amount NUMERIC,
revenue NUMERIC,
profit NUMERIC,
customer_count INT,
avg_order_value NUMERIC,
region VARCHAR,
_loaded_at TIMESTAMP
);
-- Пример: Marketing Mart
CREATE TABLE mart.marketing_funnel (
date DATE,
campaign_id VARCHAR,
campaign_name VARCHAR,
impressions INT,
clicks INT,
click_through_rate NUMERIC,
conversions INT,
conversion_rate NUMERIC,
cost NUMERIC,
revenue NUMERIC,
roi NUMERIC,
_loaded_at TIMESTAMP
);
Обслуживание витрин:
# Ежедневное обновление Sales Mart
def update_sales_mart():
query = """
INSERT INTO mart.sales_daily
SELECT
CURRENT_DATE,
p.product_id,
p.product_name,
SUM(fe.quantity),
SUM(fe.amount),
SUM(fe.revenue),
SUM(fe.revenue - fe.cost),
COUNT(DISTINCT fe.user_key),
AVG(fe.amount),
dc.region,
CURRENT_TIMESTAMP
FROM core.fact_events fe
JOIN core.dim_products p ON fe.product_key = p.product_key
JOIN core.dim_customers dc ON fe.user_key = dc.user_key
WHERE DATE(fe.event_date) = CURRENT_DATE
GROUP BY 2, 3, 9, 10
"""
execute_query(query)
5. Presentation Layer (Слой представления)
Назначение: Прямой доступ для бизнес-пользователей (через BI инструменты).
-- Пример: Представления для дашбордов
CREATE VIEW presentation.dashboard_sales_overview AS
SELECT
sd.date,
sd.product_name,
sd.sales_qty,
sd.revenue,
sd.customer_count,
sd.revenue / NULLIF(sd.customer_count, 0) as revenue_per_customer,
LAG(sd.revenue) OVER (PARTITION BY sd.product_id ORDER BY sd.date) as prev_day_revenue,
(sd.revenue - LAG(sd.revenue) OVER (PARTITION BY sd.product_id ORDER BY sd.date)) /
LAG(sd.revenue) OVER (PARTITION BY sd.product_id ORDER BY sd.date) as day_over_day_growth
FROM mart.sales_daily sd
WHERE sd.date >= CURRENT_DATE - INTERVAL '90 days';
Таблица слоёв
| Слой | Назначение | Примеры таблиц | Обновл. | Пользователи |
|---|---|---|---|---|
| Raw | Сохранить исходные данные | raw_events, raw_users | Постоянно | None (только инженеры) |
| Staging | Очистка и валидация | staging_clean_events | Ежедневно | Инженеры данных |
| Core | Бизнес-логика | fact_events, dim_users | Ежедневно | Аналитики, разработчики ML |
| Mart | Готовые витрины | sales_daily, marketing_funnel | Ежедневно | Бизнес-аналитики |
| Presentation | BI инструменты | views для дашбордов | Ежедневно | Бизнес-пользователи |
Практический пример: От Raw до Presentation
# 1. Raw: Данные из API
def load_raw_orders():
api_data = requests.get('https://api/orders').json()
df = pd.DataFrame(api_data)
df.to_sql('raw_orders', schema='raw', ...)
# 2. Staging: Очистка
def clean_to_staging():
df = pd.read_sql('SELECT * FROM raw.orders', con)
df = df.drop_duplicates()
df['order_date'] = pd.to_datetime(df['order_date'])
df.to_sql('staging_orders', schema='staging', ...)
# 3. Core: Трансформация в факты
def build_core_facts():
query = """
INSERT INTO core.fact_orders
SELECT
so.order_id,
du.user_key,
so.product_id,
so.order_date,
so.amount,
so.cost,
so.amount - so.cost as profit
FROM staging.orders so
JOIN core.dim_users du ON so.user_id = du.user_id
"""
execute_query(query)
# 4. Mart: Витрины для аналитики
def build_sales_mart():
query = """
INSERT INTO mart.sales_daily
SELECT
DATE(fo.order_date),
COUNT(*) as orders,
SUM(fo.amount) as revenue,
SUM(fo.profit) as profit
FROM core.fact_orders fo
GROUP BY 1
"""
execute_query(query)
# 5. Presentation: Для дашбордов
def create_presentation():
query = """
CREATE OR REPLACE VIEW presentation.sales_summary AS
SELECT
sd.date,
sd.orders,
sd.revenue,
SUM(sd.revenue) OVER (ORDER BY sd.date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as revenue_7day_ma
FROM mart.sales_daily sd
"""
execute_query(query)
Ключевые принципы
✅ Разделение ответственности: Каждый слой имеет одну задачу
✅ Data Lineage: Можно отследить данные от источника до дашборда
✅ Масштабируемость: Легко добавлять новые витрины без переделки ядра
✅ Quality Gates: Качество улучшается на каждом слое
✅ Версионирование: Можно держать несколько версий бизнес-логики
Инструменты для реализации
- dbt (Data Build Tool): управление трансформацией
- Apache Airflow: оркестровка ETL
- Dataflow / Spark: обработка больших объёмов
- PostgreSQL / Snowflake: хранение
- Tableau / Looker: BI инструменты