← Назад к вопросам
Пошаговый процесс исправления изменений схемы источников
2.0 Middle🔥 161 комментариев
#ETL и качество данных
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Пошаговый процесс исправления изменений схемы источников
Когда схема исходных систем меняется (добавляются/удаляются/переименовываются колонки), Data Engineer должен иметь четкий процесс адаптации. Это критично для надёжности ETL конвейеров.
Этап 1: Обнаружение изменения схемы
1.1. Установка мониторинга
Проактивный мониторинг предотвращает проблемы:
import psycopg2
from datetime import datetime
def detect_schema_changes():
current_schema = get_source_schema()
saved_schema = load_saved_schema()
changes = compare_schemas(current_schema, saved_schema)
if changes:
alert_team(f"Schema changed: {changes}")
return changes
1.2. Реакция на ошибку ETL
Чаще всего изменение обнаруживается через падение конвейера:
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract_from_source():
try:
df = pd.read_sql("SELECT * FROM orders", connection)
return df
except Exception as e:
if "column" in str(e).lower():
notify_schema_issue(e)
raise SchemaChangeError(f"Schema mismatch: {e}")
raise
Этап 2: Анализ изменений
Когда схема изменилась, нужно понять ЧТО изменилось и ПОЧЕМУ.
2.1. Определение типа изменения
from enum import Enum
class SchemaChangeType(Enum):
ADDED_COLUMN = "added_column"
REMOVED_COLUMN = "removed_column"
RENAMED_COLUMN = "renamed_column"
TYPE_CHANGED = "type_changed"
NULLABLE_CHANGED = "nullable_changed"
def classify_change(old_schema, new_schema):
changes = []
new_cols = set(new_schema.keys()) - set(old_schema.keys())
if new_cols:
changes.append((SchemaChangeType.ADDED_COLUMN, new_cols))
removed_cols = set(old_schema.keys()) - set(new_schema.keys())
if removed_cols:
changes.append((SchemaChangeType.REMOVED_COLUMN, removed_cols))
return changes
2.2. Общение с владельцем источника
Всегда проверяешь с source owner перед действиями. Правильный тон:
- Я заметил, что колонка исчезла. Это запланированное изменение?
- Как это影響 аналитику?
- Есть ли период переходности?
Этап 3: Планирование исправления
3.1. Оценка влияния (Impact Analysis)
def assess_impact(change):
impact = {
"affected_tables": [],
"affected_reports": [],
"severity": "low",
}
if change.type == "REMOVED_COLUMN":
column_name = change.column
affected_views = search_column_in_views(column_name)
impact["affected_tables"].extend(affected_views)
if affected_views:
impact["severity"] = "critical"
return impact
Этап 4: Реализация исправления
4.1. Обновление ETL логики
class DynamicSchemaExtractor:
def extract(self, source_connection, table_name):
current_schema = self.get_current_schema(table_name)
columns = [col["name"] for col in current_schema]
query = f"SELECT {','.join(columns)} FROM {table_name}"
df = pd.read_sql(query, source_connection)
return df
4.2. Обновление трансформаций
SELECT
user_id,
status,
amount
FROM raw_orders;
Этап 5: Тестирование исправления
5.1. Unit тесты
import pytest
def test_handles_removed_column():
source_data = pd.DataFrame({
"user_id": [1, 2, 3],
"status": ["active", "pending", "completed"],
})
result = extract_and_transform(source_data)
assert "status" in result.columns
assert len(result) == 3
def test_handles_added_column():
source_data = pd.DataFrame({
"user_id": [1, 2, 3],
"new_field": ["a", "b", "c"],
})
result = extract_and_transform(source_data)
assert "new_field" in result.columns
5.2. Интеграционные тесты
def test_full_pipeline_with_schema_change(test_db):
test_db.execute("""
CREATE TABLE source_orders (
user_id INT,
status VARCHAR(50),
amount DECIMAL
)
""")
result = run_etl_pipeline()
fact_orders = test_db.query("SELECT * FROM fact_orders")
assert fact_orders.shape[0] > 0
Этап 6: Развёртывание (Deployment)
Для низкого риска изменения:
- Обновить ETL
- Запустить полный пересчёт
- Проверить результаты
- Оставить backup на 1 неделю
- Удалить если всё хорошо
Для высокого риска (удаление колонки):
- Запустить impact analysis
- Обновить все витрины
- Запустить canary ETL на 10% данных
- Проверить downstream
- Полный rollout
Этап 7: Мониторинг и поддержка
7.1. Post-deployment мониторинг
def monitor_after_change():
metrics = {
"extraction_time": measure_extraction_time(),
"row_count": count_extracted_rows(),
"errors_count": count_errors_last_hour(),
}
baseline = get_baseline_metrics()
for metric, value in metrics.items():
if value > baseline[metric] * 1.2:
alert(f"{metric} deviated")
7.2. Откат (Rollback)
def rollback_schema_change(change_id):
previous_dag = get_dag_version(change_id - 1)
deploy_dag(previous_dag)
recompute_affected_tables()
notify(f"Rolled back change {change_id}")
Чеклист: Исправление изменения схемы
- Обнаружено изменение схемы
- Проведён анализ типа изменения
- Связались с владельцем источника
- Провели impact analysis
- Обновили ETL логику
- Обновили трансформации
- Написали/обновили тесты
- Провели интеграционное тестирование
- Получили одобрение на развёртывание
- Развернули в prod
- Мониторим 24 часа
- Документировали изменение
Вывод
Процесс исправления изменения схемы включает:
- Коммуникация с источниками и stakeholders
- Анализ влияния на систему
- Планирование миграции
- Тестирование на всех уровнях
- Контролируемое развёртывание
- Мониторинг и поддержка
Хороший Data Engineer ведёт этот процесс методично и предсказуемо, минимизируя риск падения production систем.