Какие знаешь проверки качества данных?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Проверки качества данных (Data Quality Checks)
Введение
Data Quality — это набор характеристик данных, определяющих их пригодность для использования: точность, полнота, консистентность, своевременность. Проверки качества — это автоматизированные тесты, которые гарантируют, что данные соответствуют ожиданиям.
1. Категории проверок качества данных
1.1 Проверка полноты (Completeness)
Убедиться, что данные не содержат неожиданные NULL или пропуски.
-- Проверка на отсутствие NULL в критичных полях
SELECT
COUNT(*) as total_rows,
COUNT(customer_id) as non_null_customer_id,
COUNT(*) - COUNT(customer_id) as missing_customer_id,
ROUND(100.0 * COUNT(customer_id) / COUNT(*), 2) as completeness_percent
FROM orders;
-- Ожидание: completeness_percent > 99.5
Правила:
- Все customer_id должны быть заполнены
- Не более 0.5% NULL в order_date
- Email должен быть не пустым для active пользователей
# Реализация в pandas
def check_completeness(df, column, max_null_percent=0.5):
null_count = df[column].isna().sum()
null_percent = 100 * null_count / len(df)
if null_percent > max_null_percent:
raise ValueError(
f"Column {column} has {null_percent:.2f}% nulls, "
f"expected max {max_null_percent}%"
)
return True
check_completeness(orders_df, 'customer_id', max_null_percent=0.5)
1.2 Проверка уникальности (Uniqueness)
Убедиться, что значения уникальны где требуется.
-- Проверка на дубликаты
SELECT
order_id,
COUNT(*) as occurrence_count
FROM orders
GROUP BY order_id
HAVING COUNT(*) > 1;
-- Ожидание: пустой результат (no duplicates)
-- Или:
SELECT COUNT(DISTINCT order_id) as unique_orders,
COUNT(*) as total_orders
FROM orders;
-- Ожидание: unique_orders == total_orders
def check_uniqueness(df, column):
duplicates = df[column].duplicated().sum()
if duplicates > 0:
raise ValueError(f"Column {column} has {duplicates} duplicate values")
return True
check_uniqueness(orders_df, 'order_id')
1.3 Проверка валидности (Validity)
Убедиться, что значения соответствуют ожиданиям (формат, диапазон, тип).
-- Проверка формата email
SELECT COUNT(*) as invalid_emails
FROM users
WHERE email NOT LIKE '%@%' OR email NOT LIKE '%.%';
-- Ожидание: 0 invalid_emails
-- Проверка диапазона
SELECT COUNT(*) as invalid_ages
FROM users
WHERE age < 0 OR age > 150;
-- Ожидание: 0
-- Проверка типа данных
SELECT COUNT(*) as non_numeric
FROM orders
WHERE amount !~ '^[0-9]+(\.[0-9]{1,2})?$';
-- Ожидание: 0
import re
from datetime import datetime
def validate_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def check_validity(df, rules):
"""Проверяет наличие валидных значений
rules: {
'email': validate_email,
'age': lambda x: 0 <= x <= 150,
'amount': lambda x: x >= 0,
}
"""
invalid_rows = []
for column, validator in rules.items():
if column not in df.columns:
continue
invalid = df[~df[column].apply(validator)]
if len(invalid) > 0:
invalid_rows.append({
'column': column,
'count': len(invalid),
'sample': invalid[column].head().tolist()
})
if invalid_rows:
raise ValueError(f"Invalid data found: {invalid_rows}")
return True
1.4 Проверка консистентности (Consistency)
Убедиться, что данные согласованы между таблицами и внутри них.
-- Проверка referential integrity (внешние ключи)
SELECT COUNT(*) as orphaned_orders
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
-- Ожидание: 0 (нет заказов без клиентов)
-- Проверка логической консистентности
SELECT COUNT(*) as inconsistent
FROM orders
WHERE order_status = 'completed' AND completed_at IS NULL;
-- Ожидание: 0 (завершённые заказы должны иметь completed_at)
-- Проверка консистентности между таблицами
SELECT COUNT(*) as mismatches
FROM (
SELECT SUM(amount) as sum_orders FROM orders
) o
CROSS JOIN (
SELECT SUM(total) as sum_transactions FROM transactions
) t
WHERE ABS(o.sum_orders - t.sum_transactions) > 0.01; -- допуск на floating point ошибки
def check_referential_integrity(orders_df, customers_df, fk_column='customer_id'):
"""Проверяет, что все FK ссылаются на существующих родителей"""
valid_ids = set(customers_df[fk_column].unique())
invalid_fks = orders_df[~orders_df[fk_column].isin(valid_ids)]
if len(invalid_fks) > 0:
raise ValueError(
f"Found {len(invalid_fks)} orphaned records in orders"
)
return True
1.5 Проверка точности (Accuracy)
Убедиться, что данные соответствуют реальности (сравнение с источником).
# Сравнение с эталонным источником
def check_accuracy(imported_df, source_df, key_column):
"""Сравнивает импортированные данные с источником"""
# Проверить, что все ключи совпадают
imported_keys = set(imported_df[key_column].unique())
source_keys = set(source_df[key_column].unique())
missing_in_import = source_keys - imported_keys
extra_in_import = imported_keys - source_keys
if missing_in_import:
raise ValueError(f"Missing records: {missing_in_import}")
if extra_in_import:
raise ValueError(f"Extra records: {extra_in_import}")
# Проверить содержимое
merged = imported_df.merge(source_df, on=key_column, suffixes=('_imported', '_source'))
for col in source_df.columns:
if col == key_column:
continue
mismatches = merged[merged[f'{col}_imported'] != merged[f'{col}_source']]
if len(mismatches) > 0:
raise ValueError(
f"Found {len(mismatches)} mismatches in column {col}"
)
return True
1.6 Проверка своевременности (Timeliness)
Убедиться, что данные актуальны и загружены вовремя.
-- Проверка, что есть свежие данные
SELECT
MAX(loaded_at) as last_load,
CURRENT_TIMESTAMP as now,
DATEDIFF(minute, MAX(loaded_at), CURRENT_TIMESTAMP) as minutes_since_load
FROM data_load_log
WHERE status = 'success'
GROUP BY 1;
-- Ожидание: minutes_since_load < 60 (загрузка менее часа назад)
from datetime import datetime, timedelta
def check_freshness(last_update_time, max_age_hours=24):
"""Проверяет, что данные не старше max_age_hours"""
age = datetime.utcnow() - last_update_time
if age > timedelta(hours=max_age_hours):
raise ValueError(
f"Data is stale: last update {age.total_seconds() / 3600:.1f} hours ago"
)
return True
2. Инструменты для проверки качества
Great Expectations
from great_expectations.dataset.pandas_dataset import PandasDataset
# Обёрнуть DataFrame
df = PandasDataset(orders_df)
# Проверка полноты
df.expect_column_values_to_not_be_null('customer_id')
df.expect_column_value_lengths_to_be_between('email', min_value=5, max_value=254)
# Проверка диапазона
df.expect_column_values_to_be_between('amount', min_value=0, max_value=1000000)
# Проверка уникальности
df.expect_column_values_to_be_unique('order_id')
# Проверка типа
df.expect_column_values_to_be_of_type('order_date', 'datetime64')
# Проверка паттерна (regex)
df.expect_column_values_to_match_regex('email', r'^[a-zA-Z0-9._%+-]+@')
# Получить результаты
results = df.validate()
print(f"Passed: {results.statistics['successful_expectations']}")
print(f"Failed: {results.statistics['unsuccessful_expectations']}")
dbt tests
# models/orders.yml
models:
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id
- name: amount
tests:
- not_null
- assert_non_negative: # Custom test
- name: status
tests:
- not_null
- accepted_values:
values: ['pending', 'completed', 'cancelled']
# Запуск тестов
dbt test
Soda SQL
import soda.scan as scan
# Определение проверок
CHECKS = """
for each dataset:
freshness(loaded_at) < 1d
checks for orders:
- missing_count(customer_id) = 0
- invalid_count(amount) = 0:
invalid_format: 'not a number'
- duplicate_count(order_id) = 0
- schema:
name:
type: text
amount:
type: numeric
order_date:
type: date
"""
# Выполнение
scan = scan.Scan()
scan.set_data_source('postgres://...')
scan.set_checks(CHECKS)
results = scan.execute()
if results.has_failures():
print("Data quality issues found!")
else:
print("All checks passed!")
3. Практическая реализация в pipeline
# Airflow DAG с проверками
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def validate_orders_quality(**context):
"""Проверка качества таблицы orders"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:password@host/db')
df = pd.read_sql('SELECT * FROM orders WHERE load_date = CURRENT_DATE', engine)
checks = {
'row_count': len(df) > 0,
'completeness_order_id': df['order_id'].isna().sum() == 0,
'completeness_customer_id': df['customer_id'].isna().sum() == 0,
'uniqueness_order_id': df['order_id'].nunique() == len(df),
'validity_amount': (df['amount'] >= 0).all(),
'validity_status': df['status'].isin(['pending', 'completed', 'cancelled']).all(),
}
failed_checks = [k for k, v in checks.items() if not v]
if failed_checks:
raise ValueError(f"Quality checks failed: {failed_checks}")
# Логирование успехов
context['ti'].xcom_push(key='quality_checks', value=checks)
return checks
def load_to_warehouse(**context):
"""Загрузка только если все проверки пройдены"""
checks = context['ti'].xcom_pull(task_ids='validate_quality', key='quality_checks')
if all(checks.values()):
print("All quality checks passed, proceeding with load...")
# Загрузка данных
else:
raise ValueError("Cannot load: quality checks failed")
default_args = {'owner': 'data-team', 'start_date': days_ago(1)}
with DAG('orders_etl', default_args=default_args, schedule_interval='@daily') as dag:
validate = PythonOperator(
task_id='validate_quality',
python_callable=validate_orders_quality,
)
load = PythonOperator(
task_id='load_warehouse',
python_callable=load_to_warehouse,
)
validate >> load
4. Чеклист проверок качества
Для каждой таблицы проверяй:
- ✅ Полнота — нет неожиданных NULL в критичных полях
- ✅ Уникальность — первичные ключи уникальны
- ✅ Валидность — значения соответствуют формату и диапазону
- ✅ Консистентность — данные согласованы между таблицами
- ✅ Точность — данные соответствуют источнику (для импорта)
- ✅ Своевременность — данные загружены вовремя
- ✅ Объём — количество строк находится в ожидаемом диапазоне
- ✅ Распределение — нет аномального распределения значений
Вывод
Проверки качества данных — это не опция, а необходимость:
- Автоматизируют контроль качества
- Предотвращают попадание "грязных" данных в хранилище
- Ускоряют обнаружение проблем в pipeline
- Обеспечивают доверие пользователей к данным
- Экономят время на отладку в production
Best Practice: Добавь проверки на каждом этапе ETL:
- После Extract (проверить источник)
- После Transform (проверить логику)
- Перед Load (финальная проверка)