Расскажи про Data lineage
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Расскажи про Data lineage
Что такое Data Lineage?
Data Lineage — это документация пути данных от источника до конечного результата.
Это ответы на вопросы:
- Откуда взялись эти данные?
- Какие трансформации прошли?
- Кто их создал и изменил?
- Что зависит от этих данных?
Зачем Data Lineage нужен?
1. Дебаг (когда отчет неправильный)
Аналитик видит: Sales Report показывает $0 Проверяет lineage: fact_sales (raw) ← источник данных ✓ fact_sales (processed) ← трансформация в dbt ✗ ← Faulting task: "remove_test_data" Находит ошибку в dbt коде
2. Data Governance (кому верить данным?)
fact_customers ← dim_customer (из CRM) ← processed_users (очищено мной) ← raw_users (источник)
Владелец: john@company.com Критичность: HIGH SLA: 99.9% доступность
3. Impact Analysis (что сломается, если я изменю источник?)
raw_users (table change)
├─ fact_customer_revenue ├─ customer_segments_dashboard ├─ fraud_detection_model └─ weekly_newsletter
4 зависимых системы потребуют проверки!
Типы Data Lineage
1. Upstream (входящие зависимости)
fact_sales ← staging_sales (table)
← raw_sales (table)
← CRM API (external source)
← Salesforce (external system)
2. Downstream (исходящие зависимости)
fact_sales → sales_by_region_agg (table) → revenue_dashboard (BI tool) → forecasting_model (ML)
Как реализовать Data Lineage?
1. Через dbt manifest.json
dbt создаёт manifest.json с зависимостями:
{
"nodes": {
"model.my_project.fact_sales": {
"depends_on": {
"nodes": [
"source.my_project.crm.sales",
"model.my_project.dim_customer"
]
}
}
}
}
Парсим manifest:
import json
import networkx as nx
with open('target/manifest.json') as f:
manifest = json.load(f)
G = nx.DiGraph()
for node_id, node in manifest['nodes'].items():
for dep in node['depends_on']['nodes']:
G.add_edge(dep, node_id)
# Найти все зависимости от fact_sales
downstream = nx.descendants(G, 'model.my_project.fact_sales')
2. Через Airflow + OpenLineage
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG('sales_pipeline')
extract = BashOperator(
task_id='extract_sales',
bash_command='python extract.py',
inlets=[External_datasource("CRM")],
outlets=[External_dataset("staging_sales")]
)
transform = BashOperator(
task_id='transform_sales',
bash_command='python transform.py',
inlets=[External_dataset("staging_sales")],
outlets=[External_dataset("fact_sales")]
)
extract >> transform
# OpenLineage автоматически отслеживает зависимости
3. SQL комментарии
-- table: fact_sales
-- owner: analytics-team@company.com
-- criticality: HIGH
-- upstream: [raw_sales, dim_customer, dim_product]
-- downstream: [sales_dashboard, revenue_forecast]
-- sla: 4h
CREATE TABLE fact_sales (
sale_id INT PRIMARY KEY,
customer_id INT REFERENCES dim_customer(customer_id),
amount DECIMAL(10,2)
);
Инструменты для Data Lineage
dbt: SQL transformation (Batch ETL) OpenMetadata: Multi-source, Data governance Great Expectations: Data quality + lineage Collibra: Enterprise governance DBT Cloud: Cloud-native solution
Практический пример: E-commerce
raw_orders (source: Shopify API)
↓
stg_orders (cleaned in dbt)
├─→ fact_orders (aggregated) │ ├─→ orders_dashboard (BI) │ └─→ revenue_forecast (ML model) └─→ fact_customer_ltv
└─→ customer_segmentation_dashboard
Метаданные:
- Owner: analytics@company.com
- Update frequency: Daily 2 AM UTC
- SLA: 3 hours
- Criticality: CRITICAL
- Data classification: Internal
Data Lineage для Data Engineer'а
Кейсы, когда спасает:
-
Отчёт неправильный: Аналитик: "Продажи упали на 50%!" Я проверяю lineage → находу обновление источника → восстанавливаю старую версию
-
Миграция БД: Нужно перейти с PostgreSQL на ClickHouse Lineage показывает какие системы зависят
-
Compliance & Audit: Регулятор спрашивает: "Где данные для отчёта?" Lineage даёт полный trail с датами, владельцами, версиями
Итог
Data Lineage — это история каждого датасета. Без неё:
- Сложно дебажить проблемы
- Нельзя управлять зависимостями
- Нарушается governance
Инвестируй в хороший lineage tracking — сэкономит часы на дебаг позже.