← Назад к вопросам

Какие делал бы типы проверок качества данных?

2.0 Middle🔥 241 комментариев
#ETL и качество данных#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Проверки качества данных (Data Quality Checks): стратегия и реализация

Проверка качества данных — критическая часть работы Data Engineer. Я внедряю многоуровневый подход для обеспечения надёжности data pipelines.

Основные категории проверок

1. Проверки полноты данных (Completeness)

Проверяют, что нет критических пропусков:

import pandas as pd
from datetime import datetime

def check_completeness(df, critical_columns):
    """
    Проверка наличия данных в критических столбцах
    """
    results = {}
    
    for col in critical_columns:
        missing_count = df[col].isnull().sum()
        missing_pct = (missing_count / len(df)) * 100
        
        results[col] = {
            'missing_count': missing_count,
            'missing_percent': missing_pct,
            'passed': missing_pct < 5  # Допустимо максимум 5%
        }
        
        if not results[col]['passed']:
            print(f"ALERT: {col} has {missing_pct:.2f}% missing values")
    
    return results

# Использование
check_completeness(df, ['user_id', 'email', 'created_at'])

2. Проверки валидности (Validity)

Проверяют, что данные соответствуют ожидаемому формату и диапазону:

def validate_data_types_and_ranges(df):
    """
    Проверка типов данных и диапазонов значений
    """
    validations = {}
    
    # Email должен содержать @
    if df['email'].dtype == 'object':
        valid_emails = df['email'].str.contains('@', na=False).sum()
        validations['email_format'] = {
            'valid_count': valid_emails,
            'total_count': len(df),
            'passed': valid_emails / len(df) > 0.95
        }
    
    # Возраст должен быть в диапазоне 0-120
    if 'age' in df.columns:
        valid_ages = ((df['age'] >= 0) & (df['age'] <= 120)).sum()
        validations['age_range'] = {
            'valid_count': valid_ages,
            'total_count': len(df),
            'passed': valid_ages / len(df) > 0.99
        }
    
    # Дата создания не может быть в будущем
    if 'created_at' in df.columns:
        df['created_at'] = pd.to_datetime(df['created_at'])
        valid_dates = (df['created_at'] <= datetime.now()).sum()
        validations['date_sanity'] = {
            'valid_count': valid_dates,
            'total_count': len(df),
            'passed': valid_dates / len(df) == 1.0
        }
    
    return validations

validate_data_types_and_ranges(df)

3. Проверки консистентности (Consistency)

Проверяют, что связанные данные согласованы между собой:

def check_consistency(df):
    """
    Проверка логической консистентности данных
    """
    issues = []
    
    # Если заказ отменён, цена должна быть 0
    cancelled_with_price = df[(df['status'] == 'cancelled') & (df['total'] > 0)]
    if len(cancelled_with_price) > 0:
        issues.append(f"Found {len(cancelled_with_price)} cancelled orders with non-zero price")
    
    # Количество товаров не может быть отрицательным
    negative_qty = df[df['quantity'] < 0]
    if len(negative_qty) > 0:
        issues.append(f"Found {len(negative_qty)} rows with negative quantity")
    
    # Дата обновления не может быть раньше даты создания
    invalid_dates = df[df['updated_at'] < df['created_at']]
    if len(invalid_dates) > 0:
        issues.append(f"Found {len(invalid_dates)} rows with updated_at < created_at")
    
    # Проверка суммы (itemized_total должна равняться total)
    price_mismatch = df[abs(df['itemized_total'] - df['total']) > 0.01]
    if len(price_mismatch) > 0:
        issues.append(f"Found {len(price_mismatch)} rows with price mismatch")
    
    return {'passed': len(issues) == 0, 'issues': issues}

check_consistency(df)

4. SQL-based Data Quality Checks

Проверки на уровне БД часто эффективнее:

-- Проверка целостности данных на уровне БД

-- 1. Иностранные ключи (referential integrity)
SELECT COUNT(*) as orphaned_orders
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE u.id IS NULL AND o.user_id IS NOT NULL;

-- 2. Дубликаты в полях, где должны быть уникальные значения
SELECT email, COUNT(*) as count
FROM users
GROUP BY email
HAVING COUNT(*) > 1;

-- 3. Проверка диапазонов
SELECT COUNT(*) as invalid_percentages
FROM metrics
WHERE completion_rate < 0 OR completion_rate > 100;

-- 4. Проверка временной корректности
SELECT COUNT(*) as future_dates
FROM events
WHERE event_date > CURRENT_TIMESTAMP;

-- 5. Проверка на NULL в критических полях
SELECT COUNT(*) as null_count
FROM orders
WHERE user_id IS NULL OR total IS NULL OR created_at IS NULL;

5. Аномалия-детекция (Anomaly Detection)

Проверки на необычные значения и статистические аномалии:

from scipy import stats

def detect_anomalies(df, column, threshold=3):
    """
    Поиск выбросов методом Z-score
    """
    z_scores = stats.zscore(df[column].dropna())
    anomalies = df[abs(z_scores) > threshold]
    
    return {
        'anomaly_count': len(anomalies),
        'anomaly_pct': (len(anomalies) / len(df)) * 100,
        'anomalies': anomalies
    }

# Используется для поиска необычных значений
detect_anomalies(df, 'transaction_amount', threshold=3)

# Для временных рядов: проверка на скачки
def detect_spikes(df, column, window=7):
    """
    Поиск резких скачков в метриках
    """
    rolling_mean = df[column].rolling(window=window).mean()
    rolling_std = df[column].rolling(window=window).std()
    
    upper_bound = rolling_mean + 3 * rolling_std
    lower_bound = rolling_mean - 3 * rolling_std
    
    spikes = df[(df[column] > upper_bound) | (df[column] < lower_bound)]
    return spikes

6. Проверки свежести данных (Timeliness)

Проверяют, что данные обновлены вовремя:

from datetime import datetime, timedelta

def check_data_freshness(df, expected_update_frequency='daily'):
    """
    Проверка свежести данных
    """
    latest_record = df['created_at'].max()
    age = datetime.now() - latest_record
    
    if expected_update_frequency == 'daily':
        freshness_threshold = timedelta(hours=26)
    elif expected_update_frequency == 'hourly':
        freshness_threshold = timedelta(hours=2)
    else:
        freshness_threshold = timedelta(hours=6)
    
    is_fresh = age < freshness_threshold
    
    return {
        'latest_record': latest_record,
        'data_age_hours': age.total_seconds() / 3600,
        'is_fresh': is_fresh,
        'alert': not is_fresh
    }

7. Great Expectations Framework

Профессиональный подход с использованием специализированной библиотеки:

from great_expectations.dataset import PandasDataset
import great_expectations as gx

# Инициализация
df_gx = PandasDataset(df)

# Определение expectations
df_gx.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)
df_gx.expect_column_values_to_not_be_null('user_id')
df_gx.expect_column_values_to_be_in_set('status', ['pending', 'completed', 'failed'])
df_gx.expect_column_values_to_be_between('age', min_value=0, max_value=120)
df_gx.expect_column_pair_values_to_be_in_set([('status', 'amount')], [('completed', None), ('pending', 0)])

# Запуск проверок
validation_result = df_gx.validate()
print(validation_result)

Интеграция в ETL Pipeline

def run_data_quality_suite(df, stage='silver'):
    """
    Комплексная проверка качества данных
    """
    checks = {
        'completeness': check_completeness(df, critical_columns),
        'validity': validate_data_types_and_ranges(df),
        'consistency': check_consistency(df),
        'freshness': check_data_freshness(df),
        'anomalies': detect_anomalies(df, 'amount', threshold=3)
    }
    
    # Логирование и алерты
    failed_checks = [k for k, v in checks.items() if not v.get('passed', False)]
    
    if failed_checks:
        # Отправить алерт
        send_alert(f"Data quality issues in {stage} layer: {failed_checks}")
        # Или остановить pipeline
        raise Exception(f"Data quality checks failed: {failed_checks}")
    
    return checks

Best Practices

  1. Проверяйте на каждом этапе: raw → silver → gold
  2. Документируйте expectations: что можно считать "хорошими" данными
  3. Устанавливайте пороги выгрузки (blocking): когда остановить pipeline
  4. Логируйте метрики качества: track improvements over time
  5. Автоматизируйте: не полагайтесь на ручные проверки

Надежные data quality checks защищают downstream системы и гарантируют валидность анализа.

Какие делал бы типы проверок качества данных? | PrepBro