← Назад к вопросам

Когда делал бы проверки качества данных, чтобы добиться лучшего результата?

2.0 Middle🔥 171 комментариев
#ETL и качество данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Когда делать проверки качества данных для лучшего результата

1. На каких этапах ETL проверять данные

Лучшая практика — многоуровневый мониторинг (defense in depth):

A. На этапе Extract (получение данных)

def extract_data():
    """Первая проверка — сразу после получения"""
    # Получение из API
    response = requests.get(api_url)
    raw_data = response.json()
    
    # Сразу проверяем
    assert response.status_code == 200, f"API returned {response.status_code}"
    assert isinstance(raw_data, list), "Expected list, got {type(raw_data)}"
    assert len(raw_data) > 0, "Empty response from API"
    
    # Валидация первой строки
    first_record = raw_data[0]
    required_fields = {'id', 'email', 'created_at'}
    missing = required_fields - set(first_record.keys())
    assert not missing, f"Missing fields: {missing}"
    
    return pd.DataFrame(raw_data)

Зачем это нужно:

  • Поймаем ошибки API на раннем этапе
  • Не потратим время на трансформацию плохих данных
  • Пересчитаем запрос, пока свеж источник

B. На этапе Transform (трансформация)

def transform_data(df_raw):
    """Проверяем каждый шаг трансформации"""
    initial_rows = len(df_raw)
    
    # Шаг 1: Удаляем дубликаты
    df = df_raw.drop_duplicates(subset=['id'])
    duplicates_removed = initial_rows - len(df)
    
    # Проверка: не удалили ли слишком много?
    if duplicates_removed / initial_rows > 0.1:
        logging.warning(
            f"WARNING: Removed {duplicates_removed} duplicates "
            f"({duplicates_removed/initial_rows*100:.1f}% of data)"
        )
    
    # Шаг 2: Очищаем даты
    df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
    nulls_from_date_parse = df['created_at'].isnull().sum()
    
    if nulls_from_date_parse > 0:
        logging.error(f"Failed to parse {nulls_from_date_parse} dates")
        # Не загружаем в DWH, останавливаем pipeline
        raise DataValidationError(f"Invalid dates: {nulls_from_date_parse}")
    
    # Шаг 3: Деньги должны быть положительными
    df = df[df['amount'] >= 0]
    removed_negative = initial_rows - len(df)
    
    if removed_negative > 0:
        logging.warning(f"Removed {removed_negative} rows with negative amounts")
    
    # Шаг 4: Финальная проверка перед Load
    assert df['id'].is_unique, "user_id должны быть уникальны"
    assert df['email'].notna().all(), "Email не должен быть NULL"
    
    return df

C. На этапе Load (загрузка в DWH)

def load_to_warehouse(df, target_table):
    """Проверяем перед попаданием в production"""
    
    # Перед Load: прогноз количества
    expected_rows = df.shape[0]
    print(f"About to load {expected_rows} rows")
    
    # Проверка: не выглядит ли подозрительно?
    avg_daily_rows = 50000
    if expected_rows > avg_daily_rows * 2:
        # Повышенный объём — требуется одобрение
        logging.warning(
            f"Data volume {expected_rows} is 2x usual. "
            f"This requires manual review."
        )
        # Отправь алерт, не загружай до проверки
        raise DataValidationError("Suspicious data volume")
    
    # Загрузка
    try:
        df.to_sql(target_table, engine, if_exists='append', index=False)
        logging.info(f"Loaded {expected_rows} rows to {target_table}")
    except Exception as e:
        logging.error(f"Load failed: {e}")
        raise
    
    # После Load: проверим, что реально загрузилось
    actual_rows = pd.read_sql(
        f"SELECT COUNT(*) as cnt FROM {target_table}",
        engine
    ).iloc[0, 0]
    
    if actual_rows < initial_rows:
        logging.error(
            f"Row count mismatch: expected {expected_rows}, "
            f"got {actual_rows}"
        )
        raise DataValidationError("Load verification failed")

2. Frequency (как часто проверять)

Для ежедневных пайплайнов

# Проверяй каждый день в одно время
# Используй Airflow задачу
from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG('daily_quality_checks', schedule_interval='@daily') as dag:
    
    def check_data_quality():
        df = pd.read_sql("SELECT * FROM fact_sales WHERE created_at >= CURRENT_DATE - 1", engine)
        
        # Вчерашние метрики
        yesterday_rows = len(df)
        yesterday_revenue = df['amount'].sum()
        
        # Сравни с историческим средним
        avg_daily_rows = 100000
        avg_revenue = 500000
        
        # Threshold: 50% отклонение считается аномалией
        if yesterday_rows < avg_daily_rows * 0.5:
            send_alert(f"Row count too low: {yesterday_rows} (expected ~{avg_daily_rows})")
        
        if yesterday_revenue < avg_revenue * 0.5:
            send_alert(f"Revenue too low: {yesterday_revenue} (expected ~{avg_revenue})")
    
    quality_check = PythonOperator(
        task_id='quality_checks',
        python_callable=check_data_quality
    )

Для real-time потоков

# Проверяй каждый микро-батч
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('streaming_quality').getOrCreate()

df_stream = (
    spark.readStream
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "events")
    .load()
)

# Проверяй каждый микро-батч
def process_with_quality_check(df):
    # Row count
    row_count = df.count()
    if row_count == 0:
        logging.warning("Empty batch received")
        return df
    
    # Schema validation
    assert 'user_id' in df.columns
    assert 'timestamp' in df.columns
    
    # Business rules
    df_valid = df.filter(col('amount') > 0)
    dropped = row_count - df_valid.count()
    if dropped > 0:
        logging.warning(f"Dropped {dropped} invalid rows")
    
    return df_valid

df_stream.foreachBatch(lambda df, id: process_with_quality_check(df))

3. Какие метрики мониторить в production

# Таблица для отслеживания метрик
CREATE TABLE data_quality_metrics (
    date DATE,
    table_name VARCHAR(255),
    total_rows BIGINT,
    null_rows BIGINT,
    duplicate_rows BIGINT,
    out_of_range_rows BIGINT,
    load_duration_seconds FLOAT,
    status VARCHAR(50),  -- 'passed', 'warning', 'failed'
    created_at TIMESTAMP
);

# Логирование
def log_quality_metrics(table_name, df, duration):
    metrics = {
        'date': datetime.now().date(),
        'table_name': table_name,
        'total_rows': len(df),
        'null_rows': df.isnull().sum().sum(),
        'duplicate_rows': df.duplicated().sum(),
        'out_of_range_rows': len(df[(df['age'] < 0) | (df['age'] > 150)]),
        'load_duration_seconds': duration,
        'status': 'passed' if duration < 300 else 'warning'
    }
    
    metrics_df = pd.DataFrame([metrics])
    metrics_df.to_sql('data_quality_metrics', engine, if_exists='append')

4. Когда ужесточить проверки

# После инцидента
def incident_checklist():
    """
    Если был инцидент (плохие данные попали в DWH):
    1. Добавь проверку, которая его поймает
    2. Запусти полную переоценку исторических данных
    3. Добавь мониторинг в production
    """
    pass

# Пример: инцидент с отрицательным доходом
# До: никаких проверок на amount
# После:
assert (df['amount'] >= 0).all(), "Negative amounts detected"

# Пример: инцидент с дубликатами
# До: deduplicate() случайным образом
# После:
assert df['order_id'].is_unique, "Duplicate order_ids"

5. Баланс между rigor и speed

# ❌ Плохо: слишком строгие проверки
# Все падает на малейших отклонениях
if df.shape[0] != expected_rows:
    raise Error("Row count must be exactly {expected_rows}")

# ✅ Хорошо: интеллектуальные пороги
if df.shape[0] < expected_rows * 0.8 or df.shape[0] > expected_rows * 1.2:
    logging.warning(f"Unusual row count: {df.shape[0]} (expected ~{expected_rows})")
    # Отправляем алерт, но не падаем

# ❌ Плохо: нет проверок (GIGO)
# Загружаем всё, что пришло
df.to_sql('table', engine, if_exists='append')

# ✅ Хорошо: проверки, но с разными уровнями severity
def validate_data(df):
    results = {'errors': [], 'warnings': []}
    
    # CRITICAL: NULL в PK
    if df['user_id'].isnull().any():
        results['errors'].append("NULL in user_id")
    
    # WARNING: несколько NULL в optional field
    if df['comment'].isnull().sum() / len(df) > 0.2:
        results['warnings'].append("Many NULLs in comment field")
    
    # На основе severity решаем
    if results['errors']:
        raise DataValidationError(results['errors'])
    
    if results['warnings']:
        logging.warning(f"Warnings: {results['warnings']}")
    
    return True

6. Когда расслабить проверки (если очень срочно)

# ТОЛЬКО в критичных ситуациях
# С явным дневником об этом

DO_SKIP_QUALITY_CHECKS = os.getenv('SKIP_QA_CHECKS') == 'true'

if not DO_SKIP_QUALITY_CHECKS:
    validate_data(df)  # Нормальное выполнение
else:
    logging.critical("⚠️ SKIPPING QUALITY CHECKS - DANGEROUS! ⚠️")
    logging.critical(f"Reason: {os.getenv('SKIP_REASON')}")
    # Загружаем с минимальными проверками
    assert df is not None and len(df) > 0

Заключение

Best practice по проверкам качества:

  1. Early & Often — проверяй на каждом этапе (Extract, Transform, Load)
  2. Graduated — разные уровни строгости (Error, Warning, Info)
  3. Automated — в pipeline, не ручной процесс
  4. Monitored — логируй метрики, сравнивай с историей
  5. Alerting — отправляй Slack/Email при аномалиях
  6. Learning — каждый инцидент → новая проверка

Хороший Data Engineer — это не тот, кто пишет красивый код, а тот, кто гарантирует, что в DWH попадают только хорошие данные.