← Назад к вопросам
Когда делал бы проверки качества данных, чтобы добиться лучшего результата?
2.0 Middle🔥 171 комментариев
#ETL и качество данных
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Когда делать проверки качества данных для лучшего результата
1. На каких этапах ETL проверять данные
Лучшая практика — многоуровневый мониторинг (defense in depth):
A. На этапе Extract (получение данных)
def extract_data():
"""Первая проверка — сразу после получения"""
# Получение из API
response = requests.get(api_url)
raw_data = response.json()
# Сразу проверяем
assert response.status_code == 200, f"API returned {response.status_code}"
assert isinstance(raw_data, list), "Expected list, got {type(raw_data)}"
assert len(raw_data) > 0, "Empty response from API"
# Валидация первой строки
first_record = raw_data[0]
required_fields = {'id', 'email', 'created_at'}
missing = required_fields - set(first_record.keys())
assert not missing, f"Missing fields: {missing}"
return pd.DataFrame(raw_data)
Зачем это нужно:
- Поймаем ошибки API на раннем этапе
- Не потратим время на трансформацию плохих данных
- Пересчитаем запрос, пока свеж источник
B. На этапе Transform (трансформация)
def transform_data(df_raw):
"""Проверяем каждый шаг трансформации"""
initial_rows = len(df_raw)
# Шаг 1: Удаляем дубликаты
df = df_raw.drop_duplicates(subset=['id'])
duplicates_removed = initial_rows - len(df)
# Проверка: не удалили ли слишком много?
if duplicates_removed / initial_rows > 0.1:
logging.warning(
f"WARNING: Removed {duplicates_removed} duplicates "
f"({duplicates_removed/initial_rows*100:.1f}% of data)"
)
# Шаг 2: Очищаем даты
df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
nulls_from_date_parse = df['created_at'].isnull().sum()
if nulls_from_date_parse > 0:
logging.error(f"Failed to parse {nulls_from_date_parse} dates")
# Не загружаем в DWH, останавливаем pipeline
raise DataValidationError(f"Invalid dates: {nulls_from_date_parse}")
# Шаг 3: Деньги должны быть положительными
df = df[df['amount'] >= 0]
removed_negative = initial_rows - len(df)
if removed_negative > 0:
logging.warning(f"Removed {removed_negative} rows with negative amounts")
# Шаг 4: Финальная проверка перед Load
assert df['id'].is_unique, "user_id должны быть уникальны"
assert df['email'].notna().all(), "Email не должен быть NULL"
return df
C. На этапе Load (загрузка в DWH)
def load_to_warehouse(df, target_table):
"""Проверяем перед попаданием в production"""
# Перед Load: прогноз количества
expected_rows = df.shape[0]
print(f"About to load {expected_rows} rows")
# Проверка: не выглядит ли подозрительно?
avg_daily_rows = 50000
if expected_rows > avg_daily_rows * 2:
# Повышенный объём — требуется одобрение
logging.warning(
f"Data volume {expected_rows} is 2x usual. "
f"This requires manual review."
)
# Отправь алерт, не загружай до проверки
raise DataValidationError("Suspicious data volume")
# Загрузка
try:
df.to_sql(target_table, engine, if_exists='append', index=False)
logging.info(f"Loaded {expected_rows} rows to {target_table}")
except Exception as e:
logging.error(f"Load failed: {e}")
raise
# После Load: проверим, что реально загрузилось
actual_rows = pd.read_sql(
f"SELECT COUNT(*) as cnt FROM {target_table}",
engine
).iloc[0, 0]
if actual_rows < initial_rows:
logging.error(
f"Row count mismatch: expected {expected_rows}, "
f"got {actual_rows}"
)
raise DataValidationError("Load verification failed")
2. Frequency (как часто проверять)
Для ежедневных пайплайнов
# Проверяй каждый день в одно время
# Используй Airflow задачу
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('daily_quality_checks', schedule_interval='@daily') as dag:
def check_data_quality():
df = pd.read_sql("SELECT * FROM fact_sales WHERE created_at >= CURRENT_DATE - 1", engine)
# Вчерашние метрики
yesterday_rows = len(df)
yesterday_revenue = df['amount'].sum()
# Сравни с историческим средним
avg_daily_rows = 100000
avg_revenue = 500000
# Threshold: 50% отклонение считается аномалией
if yesterday_rows < avg_daily_rows * 0.5:
send_alert(f"Row count too low: {yesterday_rows} (expected ~{avg_daily_rows})")
if yesterday_revenue < avg_revenue * 0.5:
send_alert(f"Revenue too low: {yesterday_revenue} (expected ~{avg_revenue})")
quality_check = PythonOperator(
task_id='quality_checks',
python_callable=check_data_quality
)
Для real-time потоков
# Проверяй каждый микро-батч
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('streaming_quality').getOrCreate()
df_stream = (
spark.readStream
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.load()
)
# Проверяй каждый микро-батч
def process_with_quality_check(df):
# Row count
row_count = df.count()
if row_count == 0:
logging.warning("Empty batch received")
return df
# Schema validation
assert 'user_id' in df.columns
assert 'timestamp' in df.columns
# Business rules
df_valid = df.filter(col('amount') > 0)
dropped = row_count - df_valid.count()
if dropped > 0:
logging.warning(f"Dropped {dropped} invalid rows")
return df_valid
df_stream.foreachBatch(lambda df, id: process_with_quality_check(df))
3. Какие метрики мониторить в production
# Таблица для отслеживания метрик
CREATE TABLE data_quality_metrics (
date DATE,
table_name VARCHAR(255),
total_rows BIGINT,
null_rows BIGINT,
duplicate_rows BIGINT,
out_of_range_rows BIGINT,
load_duration_seconds FLOAT,
status VARCHAR(50), -- 'passed', 'warning', 'failed'
created_at TIMESTAMP
);
# Логирование
def log_quality_metrics(table_name, df, duration):
metrics = {
'date': datetime.now().date(),
'table_name': table_name,
'total_rows': len(df),
'null_rows': df.isnull().sum().sum(),
'duplicate_rows': df.duplicated().sum(),
'out_of_range_rows': len(df[(df['age'] < 0) | (df['age'] > 150)]),
'load_duration_seconds': duration,
'status': 'passed' if duration < 300 else 'warning'
}
metrics_df = pd.DataFrame([metrics])
metrics_df.to_sql('data_quality_metrics', engine, if_exists='append')
4. Когда ужесточить проверки
# После инцидента
def incident_checklist():
"""
Если был инцидент (плохие данные попали в DWH):
1. Добавь проверку, которая его поймает
2. Запусти полную переоценку исторических данных
3. Добавь мониторинг в production
"""
pass
# Пример: инцидент с отрицательным доходом
# До: никаких проверок на amount
# После:
assert (df['amount'] >= 0).all(), "Negative amounts detected"
# Пример: инцидент с дубликатами
# До: deduplicate() случайным образом
# После:
assert df['order_id'].is_unique, "Duplicate order_ids"
5. Баланс между rigor и speed
# ❌ Плохо: слишком строгие проверки
# Все падает на малейших отклонениях
if df.shape[0] != expected_rows:
raise Error("Row count must be exactly {expected_rows}")
# ✅ Хорошо: интеллектуальные пороги
if df.shape[0] < expected_rows * 0.8 or df.shape[0] > expected_rows * 1.2:
logging.warning(f"Unusual row count: {df.shape[0]} (expected ~{expected_rows})")
# Отправляем алерт, но не падаем
# ❌ Плохо: нет проверок (GIGO)
# Загружаем всё, что пришло
df.to_sql('table', engine, if_exists='append')
# ✅ Хорошо: проверки, но с разными уровнями severity
def validate_data(df):
results = {'errors': [], 'warnings': []}
# CRITICAL: NULL в PK
if df['user_id'].isnull().any():
results['errors'].append("NULL in user_id")
# WARNING: несколько NULL в optional field
if df['comment'].isnull().sum() / len(df) > 0.2:
results['warnings'].append("Many NULLs in comment field")
# На основе severity решаем
if results['errors']:
raise DataValidationError(results['errors'])
if results['warnings']:
logging.warning(f"Warnings: {results['warnings']}")
return True
6. Когда расслабить проверки (если очень срочно)
# ТОЛЬКО в критичных ситуациях
# С явным дневником об этом
DO_SKIP_QUALITY_CHECKS = os.getenv('SKIP_QA_CHECKS') == 'true'
if not DO_SKIP_QUALITY_CHECKS:
validate_data(df) # Нормальное выполнение
else:
logging.critical("⚠️ SKIPPING QUALITY CHECKS - DANGEROUS! ⚠️")
logging.critical(f"Reason: {os.getenv('SKIP_REASON')}")
# Загружаем с минимальными проверками
assert df is not None and len(df) > 0
Заключение
Best practice по проверкам качества:
- Early & Often — проверяй на каждом этапе (Extract, Transform, Load)
- Graduated — разные уровни строгости (Error, Warning, Info)
- Automated — в pipeline, не ручной процесс
- Monitored — логируй метрики, сравнивай с историей
- Alerting — отправляй Slack/Email при аномалиях
- Learning — каждый инцидент → новая проверка
Хороший Data Engineer — это не тот, кто пишет красивый код, а тот, кто гарантирует, что в DWH попадают только хорошие данные.