← Назад к вопросам
Как на прошлом проекте хранились метаданные?
2.2 Middle🔥 121 комментариев
#Архитектура и проектирование#Опыт и soft skills
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Хранение метаданных в data engineering проектах
Метаданные — информация о самих данных: структура, происхождение, качество, пользователи, SLA, версии схемы и так далее. Правильное хранение метаданных критично для масштабируемых систем обработки данных.
Компоненты метаданных
Технические метаданные:
- Схема таблицы (поля, типы, constraints)
- Происхождение данных (lineage) — какие источники используются
- Обновляемость (freshness) — как часто обновляется
- Размер таблицы (row count, storage size)
- Индексы и партиции
Бизнес метаданные:
- Описание таблицы (на человеческом языке)
- Владельцы и stewards
- Качество данных (completeness, accuracy)
- Чувствительность (PII, confidential)
- SLA
Архитектура хранения метаданных
1. Встроенная система каталога (Hive Metastore)
-- Hive Metastore хранит метаданные в обычной БД
-- Просмотр таблиц
SHOW TABLES;
SHOW TABLES IN my_database;
-- Просмотр схемы
DESC my_table;
DESC FORMATTED my_table; -- подробнее
DESC EXTENDED my_table; -- совсем подробно
-- Метаданные о партициях
SHOW PARTITIONS my_table;
-- Hive хранит это в своей БД (обычно MySQL или PostgreSQL)
-- Таблицы: TABLES, COLUMNS, PARTITIONS, STORAGE_DESCRIPTORS
2. Специализированный сервис каталога (Apache Atlas)
# Apache Atlas — полнофункциональный каталог метаданных
# REST API для работы с метаданными
import requests
import json
ATLAS_URL = "http://localhost:21000/api/atlas/v2"
headers = {'Content-Type': 'application/json'}
# Регистрируем новую таблицу в Atlas
def register_table_metadata(table_name, database, schema, owner):
metadata = {
"entity": {
"typeName": "hive_table",
"attributes": {
"name": table_name,
"database": database,
"owner": owner,
"schema": json.dumps(schema),
"description": f"Table {table_name} in {database}",
"parameters": {
"classification": "public",
"sensitivity": "low"
}
}
}
}
response = requests.post(
f"{ATLAS_URL}/entity",
json=metadata,
headers=headers
)
return response.json()
# Получаем lineage (откуда приходят данные)
def get_data_lineage(table_name):
response = requests.get(
f"{ATLAS_URL}/lineage/table/{table_name}",
headers=headers
)
return response.json()
# Найти все таблицы определённой классификации
def find_tables_by_classification(classification):
query = f"hive_table where classification = '{classification}'"
response = requests.get(
f"{ATLAS_URL}/search/dsl",
params={"query": query},
headers=headers
)
return response.json()
3. SQL-based решение (собственная схема)
-- Создаём свои таблицы для метаданных
CREATE TABLE metadata_tables (
table_id INT PRIMARY KEY,
database_name VARCHAR(100),
table_name VARCHAR(100),
table_description TEXT,
owner VARCHAR(100),
created_at TIMESTAMP,
updated_at TIMESTAMP,
UNIQUE(database_name, table_name)
);
CREATE TABLE metadata_columns (
column_id INT PRIMARY KEY,
table_id INT REFERENCES metadata_tables(table_id),
column_name VARCHAR(100),
column_type VARCHAR(50),
column_description TEXT,
is_nullable BOOLEAN,
is_primary_key BOOLEAN,
position INT
);
CREATE TABLE metadata_lineage (
lineage_id INT PRIMARY KEY,
source_table_id INT REFERENCES metadata_tables(table_id),
target_table_id INT REFERENCES metadata_tables(table_id),
transform_description TEXT,
created_at TIMESTAMP
);
CREATE TABLE metadata_quality (
quality_id INT PRIMARY KEY,
table_id INT REFERENCES metadata_tables(table_id),
completeness DECIMAL(5, 2), -- % заполненных значений
freshness_hours INT, -- сколько часов назад обновлено
row_count BIGINT,
last_checked TIMESTAMP
);
CREATE TABLE metadata_access (
access_id INT PRIMARY KEY,
table_id INT REFERENCES metadata_tables(table_id),
user_email VARCHAR(100),
access_level VARCHAR(50), -- 'read', 'write', 'admin'
granted_at TIMESTAMP
);
-- Регистрируем таблицу
INSERT INTO metadata_tables (database_name, table_name, table_description, owner)
VALUES ('analytics', 'user_activity', 'User activity logs from web and mobile', 'data-team@company.com');
-- Добавляем колонки
INSERT INTO metadata_columns (table_id, column_name, column_type, column_description, position)
SELECT
table_id,
'user_id', 'UUID', 'Unique user identifier', 1
FROM metadata_tables
WHERE table_name = 'user_activity';
-- Добавляем lineage
INSERT INTO metadata_lineage (source_table_id, target_table_id, transform_description)
SELECT
(SELECT table_id FROM metadata_tables WHERE table_name = 'events'),
(SELECT table_id FROM metadata_tables WHERE table_name = 'user_activity'),
'Filter by user_id, aggregate by day'
WHERE NOT EXISTS (SELECT 1 FROM metadata_lineage WHERE source_table_id = 1 AND target_table_id = 2);
Интеграция с data discovery инструментами
# Пример: Alation для data discovery
# Alation автоматически парсит метаданные и создаёт каталог
from alation_api_client import client
api = client.AlationAPIClient(host='https://alation.company.com')
# Проверяем, есть ли таблица в каталоге
table_info = api.get_table_by_name('analytics', 'user_activity')
if table_info:
print(f"Owner: {table_info['owner']}")
print(f"Description: {table_info['description']}")
print(f"Last updated: {table_info['last_modified']}")
else:
# Добавляем в каталог
api.add_table({
'name': 'user_activity',
'schema': 'analytics',
'owner': 'data-team@company.com'
})
# Добавляем примечание
api.add_comment(
table_name='user_activity',
comment='This table contains only aggregated data, details in raw_events'
)
Automatic metadata extraction
# Автоматическое извлечение метаданных из Parquet/ORC файлов
import pyarrow.parquet as pq
import json
def extract_schema_metadata(parquet_file):
"""Извлекает схему из Parquet файла"""
parquet_file = pq.read_table(parquet_file)
schema = parquet_file.schema
metadata = {
'table_name': None, # нужно определить
'columns': []
}
for i, field in enumerate(schema):
column_meta = {
'name': field.name,
'type': str(field.type),
'nullable': field.nullable,
'position': i
}
metadata['columns'].append(column_meta)
return metadata
# Сохраняем в JSON
schema_meta = extract_schema_metadata('data/events.parquet')
with open('metadata/events_schema.json', 'w') as f:
json.dump(schema_meta, f, indent=2)
Мониторинг изменений схемы
# Отслеживаем изменения схемы между запусками
def detect_schema_changes(old_schema, new_schema):
changes = {
'added_columns': [],
'removed_columns': [],
'modified_columns': []
}
old_cols = {col['name']: col for col in old_schema['columns']}
new_cols = {col['name']: col for col in new_schema['columns']}
# Новые колонки
for col_name in new_cols:
if col_name not in old_cols:
changes['added_columns'].append(col_name)
# Удалённые колонки
for col_name in old_cols:
if col_name not in new_cols:
changes['removed_columns'].append(col_name)
# Изменённые колонки
for col_name in old_cols:
if col_name in new_cols:
if old_cols[col_name]['type'] != new_cols[col_name]['type']:
changes['modified_columns'].append({
'name': col_name,
'old_type': old_cols[col_name]['type'],
'new_type': new_cols[col_name]['type']
})
return changes
Best Practices
- Версионируй схему: сохраняй историю изменений
- Документируй всё: описание таблиц, колонок, бизнес правила
- Автоматизируй extraction: не полагайся на ручное внесение
- Lineage tracking: отслеживай откуда приходят данные
- Quality metrics: мониторь полноту, актуальность, точность
- Access control: кто может читать/писать в таблицу
- SLA: гарантии на доступность и свежесть
- Data glossary: единый словарь терминов
- Regular audits: проверяй что метаданные актуальны
- Мониторь невиспользуемые таблицы — можно удалить или заморозить