← Назад к вопросам

Пошаговый процесс исправления изменений схемы источников

2.0 Middle🔥 161 комментариев
#ETL и качество данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Пошаговый процесс исправления изменений схемы источников

Когда схема исходных систем меняется (добавляются/удаляются/переименовываются колонки), Data Engineer должен иметь четкий процесс адаптации. Это критично для надёжности ETL конвейеров.

Этап 1: Обнаружение изменения схемы

1.1. Установка мониторинга

Проактивный мониторинг предотвращает проблемы:

import psycopg2
from datetime import datetime

def detect_schema_changes():
    current_schema = get_source_schema()
    saved_schema = load_saved_schema()
    changes = compare_schemas(current_schema, saved_schema)
    if changes:
        alert_team(f"Schema changed: {changes}")
    return changes

1.2. Реакция на ошибку ETL

Чаще всего изменение обнаруживается через падение конвейера:

from airflow import DAG
from airflow.operators.python import PythonOperator

def extract_from_source():
    try:
        df = pd.read_sql("SELECT * FROM orders", connection)
        return df
    except Exception as e:
        if "column" in str(e).lower():
            notify_schema_issue(e)
            raise SchemaChangeError(f"Schema mismatch: {e}")
        raise

Этап 2: Анализ изменений

Когда схема изменилась, нужно понять ЧТО изменилось и ПОЧЕМУ.

2.1. Определение типа изменения

from enum import Enum

class SchemaChangeType(Enum):
    ADDED_COLUMN = "added_column"
    REMOVED_COLUMN = "removed_column"
    RENAMED_COLUMN = "renamed_column"
    TYPE_CHANGED = "type_changed"
    NULLABLE_CHANGED = "nullable_changed"

def classify_change(old_schema, new_schema):
    changes = []
    new_cols = set(new_schema.keys()) - set(old_schema.keys())
    if new_cols:
        changes.append((SchemaChangeType.ADDED_COLUMN, new_cols))
    removed_cols = set(old_schema.keys()) - set(new_schema.keys())
    if removed_cols:
        changes.append((SchemaChangeType.REMOVED_COLUMN, removed_cols))
    return changes

2.2. Общение с владельцем источника

Всегда проверяешь с source owner перед действиями. Правильный тон:

  • Я заметил, что колонка исчезла. Это запланированное изменение?
  • Как это影響 аналитику?
  • Есть ли период переходности?

Этап 3: Планирование исправления

3.1. Оценка влияния (Impact Analysis)

def assess_impact(change):
    impact = {
        "affected_tables": [],
        "affected_reports": [],
        "severity": "low",
    }
    if change.type == "REMOVED_COLUMN":
        column_name = change.column
        affected_views = search_column_in_views(column_name)
        impact["affected_tables"].extend(affected_views)
        if affected_views:
            impact["severity"] = "critical"
    return impact

Этап 4: Реализация исправления

4.1. Обновление ETL логики

class DynamicSchemaExtractor:
    def extract(self, source_connection, table_name):
        current_schema = self.get_current_schema(table_name)
        columns = [col["name"] for col in current_schema]
        query = f"SELECT {','.join(columns)} FROM {table_name}"
        df = pd.read_sql(query, source_connection)
        return df

4.2. Обновление трансформаций

SELECT 
  user_id,
  status,
  amount
FROM raw_orders;

Этап 5: Тестирование исправления

5.1. Unit тесты

import pytest

def test_handles_removed_column():
    source_data = pd.DataFrame({
        "user_id": [1, 2, 3],
        "status": ["active", "pending", "completed"],
    })
    result = extract_and_transform(source_data)
    assert "status" in result.columns
    assert len(result) == 3

def test_handles_added_column():
    source_data = pd.DataFrame({
        "user_id": [1, 2, 3],
        "new_field": ["a", "b", "c"],
    })
    result = extract_and_transform(source_data)
    assert "new_field" in result.columns

5.2. Интеграционные тесты

def test_full_pipeline_with_schema_change(test_db):
    test_db.execute("""
        CREATE TABLE source_orders (
            user_id INT,
            status VARCHAR(50),
            amount DECIMAL
        )
    """)
    result = run_etl_pipeline()
    fact_orders = test_db.query("SELECT * FROM fact_orders")
    assert fact_orders.shape[0] > 0

Этап 6: Развёртывание (Deployment)

Для низкого риска изменения:

  1. Обновить ETL
  2. Запустить полный пересчёт
  3. Проверить результаты
  4. Оставить backup на 1 неделю
  5. Удалить если всё хорошо

Для высокого риска (удаление колонки):

  1. Запустить impact analysis
  2. Обновить все витрины
  3. Запустить canary ETL на 10% данных
  4. Проверить downstream
  5. Полный rollout

Этап 7: Мониторинг и поддержка

7.1. Post-deployment мониторинг

def monitor_after_change():
    metrics = {
        "extraction_time": measure_extraction_time(),
        "row_count": count_extracted_rows(),
        "errors_count": count_errors_last_hour(),
    }
    baseline = get_baseline_metrics()
    for metric, value in metrics.items():
        if value > baseline[metric] * 1.2:
            alert(f"{metric} deviated")

7.2. Откат (Rollback)

def rollback_schema_change(change_id):
    previous_dag = get_dag_version(change_id - 1)
    deploy_dag(previous_dag)
    recompute_affected_tables()
    notify(f"Rolled back change {change_id}")

Чеклист: Исправление изменения схемы

  • Обнаружено изменение схемы
  • Проведён анализ типа изменения
  • Связались с владельцем источника
  • Провели impact analysis
  • Обновили ETL логику
  • Обновили трансформации
  • Написали/обновили тесты
  • Провели интеграционное тестирование
  • Получили одобрение на развёртывание
  • Развернули в prod
  • Мониторим 24 часа
  • Документировали изменение

Вывод

Процесс исправления изменения схемы включает:

  • Коммуникация с источниками и stakeholders
  • Анализ влияния на систему
  • Планирование миграции
  • Тестирование на всех уровнях
  • Контролируемое развёртывание
  • Мониторинг и поддержка

Хороший Data Engineer ведёт этот процесс методично и предсказуемо, минимизируя риск падения production систем.

Пошаговый процесс исправления изменений схемы источников | PrepBro