← Назад к вопросам

Какие знаешь проверки качества данных?

1.8 Middle🔥 211 комментариев
#ETL и качество данных#Аналитика и метрики

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Проверки качества данных (Data Quality Checks)

Введение

Data Quality — это набор характеристик данных, определяющих их пригодность для использования: точность, полнота, консистентность, своевременность. Проверки качества — это автоматизированные тесты, которые гарантируют, что данные соответствуют ожиданиям.

1. Категории проверок качества данных

1.1 Проверка полноты (Completeness)

Убедиться, что данные не содержат неожиданные NULL или пропуски.

-- Проверка на отсутствие NULL в критичных полях
SELECT 
    COUNT(*) as total_rows,
    COUNT(customer_id) as non_null_customer_id,
    COUNT(*) - COUNT(customer_id) as missing_customer_id,
    ROUND(100.0 * COUNT(customer_id) / COUNT(*), 2) as completeness_percent
FROM orders;

-- Ожидание: completeness_percent > 99.5

Правила:

  • Все customer_id должны быть заполнены
  • Не более 0.5% NULL в order_date
  • Email должен быть не пустым для active пользователей
# Реализация в pandas
def check_completeness(df, column, max_null_percent=0.5):
    null_count = df[column].isna().sum()
    null_percent = 100 * null_count / len(df)
    
    if null_percent > max_null_percent:
        raise ValueError(
            f"Column {column} has {null_percent:.2f}% nulls, "
            f"expected max {max_null_percent}%"
        )
    
    return True

check_completeness(orders_df, 'customer_id', max_null_percent=0.5)

1.2 Проверка уникальности (Uniqueness)

Убедиться, что значения уникальны где требуется.

-- Проверка на дубликаты
SELECT 
    order_id,
    COUNT(*) as occurrence_count
FROM orders
GROUP BY order_id
HAVING COUNT(*) > 1;
-- Ожидание: пустой результат (no duplicates)

-- Или:
SELECT COUNT(DISTINCT order_id) as unique_orders,
       COUNT(*) as total_orders
FROM orders;
-- Ожидание: unique_orders == total_orders
def check_uniqueness(df, column):
    duplicates = df[column].duplicated().sum()
    if duplicates > 0:
        raise ValueError(f"Column {column} has {duplicates} duplicate values")
    return True

check_uniqueness(orders_df, 'order_id')

1.3 Проверка валидности (Validity)

Убедиться, что значения соответствуют ожиданиям (формат, диапазон, тип).

-- Проверка формата email
SELECT COUNT(*) as invalid_emails
FROM users
WHERE email NOT LIKE '%@%' OR email NOT LIKE '%.%';
-- Ожидание: 0 invalid_emails

-- Проверка диапазона
SELECT COUNT(*) as invalid_ages
FROM users
WHERE age < 0 OR age > 150;
-- Ожидание: 0

-- Проверка типа данных
SELECT COUNT(*) as non_numeric
FROM orders
WHERE amount !~ '^[0-9]+(\.[0-9]{1,2})?$';
-- Ожидание: 0
import re
from datetime import datetime

def validate_email(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return re.match(pattern, email) is not None

def check_validity(df, rules):
    """Проверяет наличие валидных значений
    
    rules: {
        'email': validate_email,
        'age': lambda x: 0 <= x <= 150,
        'amount': lambda x: x >= 0,
    }
    """
    invalid_rows = []
    
    for column, validator in rules.items():
        if column not in df.columns:
            continue
        
        invalid = df[~df[column].apply(validator)]
        if len(invalid) > 0:
            invalid_rows.append({
                'column': column,
                'count': len(invalid),
                'sample': invalid[column].head().tolist()
            })
    
    if invalid_rows:
        raise ValueError(f"Invalid data found: {invalid_rows}")
    
    return True

1.4 Проверка консистентности (Consistency)

Убедиться, что данные согласованы между таблицами и внутри них.

-- Проверка referential integrity (внешние ключи)
SELECT COUNT(*) as orphaned_orders
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
-- Ожидание: 0 (нет заказов без клиентов)

-- Проверка логической консистентности
SELECT COUNT(*) as inconsistent
FROM orders
WHERE order_status = 'completed' AND completed_at IS NULL;
-- Ожидание: 0 (завершённые заказы должны иметь completed_at)

-- Проверка консистентности между таблицами
SELECT COUNT(*) as mismatches
FROM (
    SELECT SUM(amount) as sum_orders FROM orders
) o
CROSS JOIN (
    SELECT SUM(total) as sum_transactions FROM transactions  
) t
WHERE ABS(o.sum_orders - t.sum_transactions) > 0.01;  -- допуск на floating point ошибки
def check_referential_integrity(orders_df, customers_df, fk_column='customer_id'):
    """Проверяет, что все FK ссылаются на существующих родителей"""
    valid_ids = set(customers_df[fk_column].unique())
    invalid_fks = orders_df[~orders_df[fk_column].isin(valid_ids)]
    
    if len(invalid_fks) > 0:
        raise ValueError(
            f"Found {len(invalid_fks)} orphaned records in orders"
        )
    return True

1.5 Проверка точности (Accuracy)

Убедиться, что данные соответствуют реальности (сравнение с источником).

# Сравнение с эталонным источником
def check_accuracy(imported_df, source_df, key_column):
    """Сравнивает импортированные данные с источником"""
    
    # Проверить, что все ключи совпадают
    imported_keys = set(imported_df[key_column].unique())
    source_keys = set(source_df[key_column].unique())
    
    missing_in_import = source_keys - imported_keys
    extra_in_import = imported_keys - source_keys
    
    if missing_in_import:
        raise ValueError(f"Missing records: {missing_in_import}")
    if extra_in_import:
        raise ValueError(f"Extra records: {extra_in_import}")
    
    # Проверить содержимое
    merged = imported_df.merge(source_df, on=key_column, suffixes=('_imported', '_source'))
    
    for col in source_df.columns:
        if col == key_column:
            continue
        mismatches = merged[merged[f'{col}_imported'] != merged[f'{col}_source']]
        if len(mismatches) > 0:
            raise ValueError(
                f"Found {len(mismatches)} mismatches in column {col}"
            )
    
    return True

1.6 Проверка своевременности (Timeliness)

Убедиться, что данные актуальны и загружены вовремя.

-- Проверка, что есть свежие данные
SELECT 
    MAX(loaded_at) as last_load,
    CURRENT_TIMESTAMP as now,
    DATEDIFF(minute, MAX(loaded_at), CURRENT_TIMESTAMP) as minutes_since_load
FROM data_load_log
WHERE status = 'success'
GROUP BY 1;

-- Ожидание: minutes_since_load < 60 (загрузка менее часа назад)
from datetime import datetime, timedelta

def check_freshness(last_update_time, max_age_hours=24):
    """Проверяет, что данные не старше max_age_hours"""
    age = datetime.utcnow() - last_update_time
    
    if age > timedelta(hours=max_age_hours):
        raise ValueError(
            f"Data is stale: last update {age.total_seconds() / 3600:.1f} hours ago"
        )
    return True

2. Инструменты для проверки качества

Great Expectations

from great_expectations.dataset.pandas_dataset import PandasDataset

# Обёрнуть DataFrame
df = PandasDataset(orders_df)

# Проверка полноты
df.expect_column_values_to_not_be_null('customer_id')
df.expect_column_value_lengths_to_be_between('email', min_value=5, max_value=254)

# Проверка диапазона
df.expect_column_values_to_be_between('amount', min_value=0, max_value=1000000)

# Проверка уникальности
df.expect_column_values_to_be_unique('order_id')

# Проверка типа
df.expect_column_values_to_be_of_type('order_date', 'datetime64')

# Проверка паттерна (regex)
df.expect_column_values_to_match_regex('email', r'^[a-zA-Z0-9._%+-]+@')

# Получить результаты
results = df.validate()
print(f"Passed: {results.statistics['successful_expectations']}")
print(f"Failed: {results.statistics['unsuccessful_expectations']}")

dbt tests

# models/orders.yml
models:
  - name: orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('customers')
              field: customer_id
      
      - name: amount
        tests:
          - not_null
          - assert_non_negative:  # Custom test
          
      - name: status
        tests:
          - not_null
          - accepted_values:
              values: ['pending', 'completed', 'cancelled']
# Запуск тестов
dbt test

Soda SQL

import soda.scan as scan

# Определение проверок
CHECKS = """
for each dataset:
  freshness(loaded_at) < 1d
  
checks for orders:
  - missing_count(customer_id) = 0
  - invalid_count(amount) = 0:
      invalid_format: 'not a number'
  - duplicate_count(order_id) = 0
  - schema:
      name:
        type: text
      amount:
        type: numeric
      order_date:
        type: date
"""

# Выполнение
scan = scan.Scan()
scan.set_data_source('postgres://...')
scan.set_checks(CHECKS)
results = scan.execute()

if results.has_failures():
    print("Data quality issues found!")
else:
    print("All checks passed!")

3. Практическая реализация в pipeline

# Airflow DAG с проверками
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def validate_orders_quality(**context):
    """Проверка качества таблицы orders"""
    import pandas as pd
    from sqlalchemy import create_engine
    
    engine = create_engine('postgresql://user:password@host/db')
    df = pd.read_sql('SELECT * FROM orders WHERE load_date = CURRENT_DATE', engine)
    
    checks = {
        'row_count': len(df) > 0,
        'completeness_order_id': df['order_id'].isna().sum() == 0,
        'completeness_customer_id': df['customer_id'].isna().sum() == 0,
        'uniqueness_order_id': df['order_id'].nunique() == len(df),
        'validity_amount': (df['amount'] >= 0).all(),
        'validity_status': df['status'].isin(['pending', 'completed', 'cancelled']).all(),
    }
    
    failed_checks = [k for k, v in checks.items() if not v]
    
    if failed_checks:
        raise ValueError(f"Quality checks failed: {failed_checks}")
    
    # Логирование успехов
    context['ti'].xcom_push(key='quality_checks', value=checks)
    return checks

def load_to_warehouse(**context):
    """Загрузка только если все проверки пройдены"""
    checks = context['ti'].xcom_pull(task_ids='validate_quality', key='quality_checks')
    
    if all(checks.values()):
        print("All quality checks passed, proceeding with load...")
        # Загрузка данных
    else:
        raise ValueError("Cannot load: quality checks failed")

default_args = {'owner': 'data-team', 'start_date': days_ago(1)}

with DAG('orders_etl', default_args=default_args, schedule_interval='@daily') as dag:
    validate = PythonOperator(
        task_id='validate_quality',
        python_callable=validate_orders_quality,
    )
    
    load = PythonOperator(
        task_id='load_warehouse',
        python_callable=load_to_warehouse,
    )
    
    validate >> load

4. Чеклист проверок качества

Для каждой таблицы проверяй:

  • Полнота — нет неожиданных NULL в критичных полях
  • Уникальность — первичные ключи уникальны
  • Валидность — значения соответствуют формату и диапазону
  • Консистентность — данные согласованы между таблицами
  • Точность — данные соответствуют источнику (для импорта)
  • Своевременность — данные загружены вовремя
  • Объём — количество строк находится в ожидаемом диапазоне
  • Распределение — нет аномального распределения значений

Вывод

Проверки качества данных — это не опция, а необходимость:

  • Автоматизируют контроль качества
  • Предотвращают попадание "грязных" данных в хранилище
  • Ускоряют обнаружение проблем в pipeline
  • Обеспечивают доверие пользователей к данным
  • Экономят время на отладку в production

Best Practice: Добавь проверки на каждом этапе ETL:

  1. После Extract (проверить источник)
  2. После Transform (проверить логику)
  3. Перед Load (финальная проверка)