← Назад к вопросам

Как на прошлом проекте хранились метаданные?

2.2 Middle🔥 121 комментариев
#Архитектура и проектирование#Опыт и soft skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Хранение метаданных в data engineering проектах

Метаданные — информация о самих данных: структура, происхождение, качество, пользователи, SLA, версии схемы и так далее. Правильное хранение метаданных критично для масштабируемых систем обработки данных.

Компоненты метаданных

Технические метаданные:

  • Схема таблицы (поля, типы, constraints)
  • Происхождение данных (lineage) — какие источники используются
  • Обновляемость (freshness) — как часто обновляется
  • Размер таблицы (row count, storage size)
  • Индексы и партиции

Бизнес метаданные:

  • Описание таблицы (на человеческом языке)
  • Владельцы и stewards
  • Качество данных (completeness, accuracy)
  • Чувствительность (PII, confidential)
  • SLA

Архитектура хранения метаданных

1. Встроенная система каталога (Hive Metastore)

-- Hive Metastore хранит метаданные в обычной БД
-- Просмотр таблиц
SHOW TABLES;
SHOW TABLES IN my_database;

-- Просмотр схемы
DESC my_table;
DESC FORMATTED my_table;  -- подробнее
DESC EXTENDED my_table;   -- совсем подробно

-- Метаданные о партициях
SHOW PARTITIONS my_table;

-- Hive хранит это в своей БД (обычно MySQL или PostgreSQL)
-- Таблицы: TABLES, COLUMNS, PARTITIONS, STORAGE_DESCRIPTORS

2. Специализированный сервис каталога (Apache Atlas)

# Apache Atlas — полнофункциональный каталог метаданных
# REST API для работы с метаданными

import requests
import json

ATLAS_URL = "http://localhost:21000/api/atlas/v2"
headers = {'Content-Type': 'application/json'}

# Регистрируем новую таблицу в Atlas
def register_table_metadata(table_name, database, schema, owner):
    metadata = {
        "entity": {
            "typeName": "hive_table",
            "attributes": {
                "name": table_name,
                "database": database,
                "owner": owner,
                "schema": json.dumps(schema),
                "description": f"Table {table_name} in {database}",
                "parameters": {
                    "classification": "public",
                    "sensitivity": "low"
                }
            }
        }
    }
    
    response = requests.post(
        f"{ATLAS_URL}/entity",
        json=metadata,
        headers=headers
    )
    return response.json()

# Получаем lineage (откуда приходят данные)
def get_data_lineage(table_name):
    response = requests.get(
        f"{ATLAS_URL}/lineage/table/{table_name}",
        headers=headers
    )
    return response.json()

# Найти все таблицы определённой классификации
def find_tables_by_classification(classification):
    query = f"hive_table where classification = '{classification}'"
    response = requests.get(
        f"{ATLAS_URL}/search/dsl",
        params={"query": query},
        headers=headers
    )
    return response.json()

3. SQL-based решение (собственная схема)

-- Создаём свои таблицы для метаданных
CREATE TABLE metadata_tables (
    table_id INT PRIMARY KEY,
    database_name VARCHAR(100),
    table_name VARCHAR(100),
    table_description TEXT,
    owner VARCHAR(100),
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    UNIQUE(database_name, table_name)
);

CREATE TABLE metadata_columns (
    column_id INT PRIMARY KEY,
    table_id INT REFERENCES metadata_tables(table_id),
    column_name VARCHAR(100),
    column_type VARCHAR(50),
    column_description TEXT,
    is_nullable BOOLEAN,
    is_primary_key BOOLEAN,
    position INT
);

CREATE TABLE metadata_lineage (
    lineage_id INT PRIMARY KEY,
    source_table_id INT REFERENCES metadata_tables(table_id),
    target_table_id INT REFERENCES metadata_tables(table_id),
    transform_description TEXT,
    created_at TIMESTAMP
);

CREATE TABLE metadata_quality (
    quality_id INT PRIMARY KEY,
    table_id INT REFERENCES metadata_tables(table_id),
    completeness DECIMAL(5, 2),  -- % заполненных значений
    freshness_hours INT,  -- сколько часов назад обновлено
    row_count BIGINT,
    last_checked TIMESTAMP
);

CREATE TABLE metadata_access (
    access_id INT PRIMARY KEY,
    table_id INT REFERENCES metadata_tables(table_id),
    user_email VARCHAR(100),
    access_level VARCHAR(50),  -- 'read', 'write', 'admin'
    granted_at TIMESTAMP
);

-- Регистрируем таблицу
INSERT INTO metadata_tables (database_name, table_name, table_description, owner)
VALUES ('analytics', 'user_activity', 'User activity logs from web and mobile', 'data-team@company.com');

-- Добавляем колонки
INSERT INTO metadata_columns (table_id, column_name, column_type, column_description, position)
SELECT 
    table_id,
    'user_id', 'UUID', 'Unique user identifier', 1
FROM metadata_tables
WHERE table_name = 'user_activity';

-- Добавляем lineage
INSERT INTO metadata_lineage (source_table_id, target_table_id, transform_description)
SELECT 
    (SELECT table_id FROM metadata_tables WHERE table_name = 'events'),
    (SELECT table_id FROM metadata_tables WHERE table_name = 'user_activity'),
    'Filter by user_id, aggregate by day'
WHERE NOT EXISTS (SELECT 1 FROM metadata_lineage WHERE source_table_id = 1 AND target_table_id = 2);

Интеграция с data discovery инструментами

# Пример: Alation для data discovery
# Alation автоматически парсит метаданные и создаёт каталог

from alation_api_client import client

api = client.AlationAPIClient(host='https://alation.company.com')

# Проверяем, есть ли таблица в каталоге
table_info = api.get_table_by_name('analytics', 'user_activity')

if table_info:
    print(f"Owner: {table_info['owner']}")
    print(f"Description: {table_info['description']}")
    print(f"Last updated: {table_info['last_modified']}")
else:
    # Добавляем в каталог
    api.add_table({
        'name': 'user_activity',
        'schema': 'analytics',
        'owner': 'data-team@company.com'
    })

# Добавляем примечание
api.add_comment(
    table_name='user_activity',
    comment='This table contains only aggregated data, details in raw_events'
)

Automatic metadata extraction

# Автоматическое извлечение метаданных из Parquet/ORC файлов
import pyarrow.parquet as pq
import json

def extract_schema_metadata(parquet_file):
    """Извлекает схему из Parquet файла"""
    parquet_file = pq.read_table(parquet_file)
    schema = parquet_file.schema
    
    metadata = {
        'table_name': None,  # нужно определить
        'columns': []
    }
    
    for i, field in enumerate(schema):
        column_meta = {
            'name': field.name,
            'type': str(field.type),
            'nullable': field.nullable,
            'position': i
        }
        metadata['columns'].append(column_meta)
    
    return metadata

# Сохраняем в JSON
schema_meta = extract_schema_metadata('data/events.parquet')
with open('metadata/events_schema.json', 'w') as f:
    json.dump(schema_meta, f, indent=2)

Мониторинг изменений схемы

# Отслеживаем изменения схемы между запусками
def detect_schema_changes(old_schema, new_schema):
    changes = {
        'added_columns': [],
        'removed_columns': [],
        'modified_columns': []
    }
    
    old_cols = {col['name']: col for col in old_schema['columns']}
    new_cols = {col['name']: col for col in new_schema['columns']}
    
    # Новые колонки
    for col_name in new_cols:
        if col_name not in old_cols:
            changes['added_columns'].append(col_name)
    
    # Удалённые колонки
    for col_name in old_cols:
        if col_name not in new_cols:
            changes['removed_columns'].append(col_name)
    
    # Изменённые колонки
    for col_name in old_cols:
        if col_name in new_cols:
            if old_cols[col_name]['type'] != new_cols[col_name]['type']:
                changes['modified_columns'].append({
                    'name': col_name,
                    'old_type': old_cols[col_name]['type'],
                    'new_type': new_cols[col_name]['type']
                })
    
    return changes

Best Practices

  • Версионируй схему: сохраняй историю изменений
  • Документируй всё: описание таблиц, колонок, бизнес правила
  • Автоматизируй extraction: не полагайся на ручное внесение
  • Lineage tracking: отслеживай откуда приходят данные
  • Quality metrics: мониторь полноту, актуальность, точность
  • Access control: кто может читать/писать в таблицу
  • SLA: гарантии на доступность и свежесть
  • Data glossary: единый словарь терминов
  • Regular audits: проверяй что метаданные актуальны
  • Мониторь невиспользуемые таблицы — можно удалить или заморозить
Как на прошлом проекте хранились метаданные? | PrepBro