← Назад к вопросам
Какие делал бы типы проверок качества данных?
2.0 Middle🔥 241 комментариев
#ETL и качество данных#Инструменты разработки
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Проверки качества данных (Data Quality Checks): стратегия и реализация
Проверка качества данных — критическая часть работы Data Engineer. Я внедряю многоуровневый подход для обеспечения надёжности data pipelines.
Основные категории проверок
1. Проверки полноты данных (Completeness)
Проверяют, что нет критических пропусков:
import pandas as pd
from datetime import datetime
def check_completeness(df, critical_columns):
"""
Проверка наличия данных в критических столбцах
"""
results = {}
for col in critical_columns:
missing_count = df[col].isnull().sum()
missing_pct = (missing_count / len(df)) * 100
results[col] = {
'missing_count': missing_count,
'missing_percent': missing_pct,
'passed': missing_pct < 5 # Допустимо максимум 5%
}
if not results[col]['passed']:
print(f"ALERT: {col} has {missing_pct:.2f}% missing values")
return results
# Использование
check_completeness(df, ['user_id', 'email', 'created_at'])
2. Проверки валидности (Validity)
Проверяют, что данные соответствуют ожидаемому формату и диапазону:
def validate_data_types_and_ranges(df):
"""
Проверка типов данных и диапазонов значений
"""
validations = {}
# Email должен содержать @
if df['email'].dtype == 'object':
valid_emails = df['email'].str.contains('@', na=False).sum()
validations['email_format'] = {
'valid_count': valid_emails,
'total_count': len(df),
'passed': valid_emails / len(df) > 0.95
}
# Возраст должен быть в диапазоне 0-120
if 'age' in df.columns:
valid_ages = ((df['age'] >= 0) & (df['age'] <= 120)).sum()
validations['age_range'] = {
'valid_count': valid_ages,
'total_count': len(df),
'passed': valid_ages / len(df) > 0.99
}
# Дата создания не может быть в будущем
if 'created_at' in df.columns:
df['created_at'] = pd.to_datetime(df['created_at'])
valid_dates = (df['created_at'] <= datetime.now()).sum()
validations['date_sanity'] = {
'valid_count': valid_dates,
'total_count': len(df),
'passed': valid_dates / len(df) == 1.0
}
return validations
validate_data_types_and_ranges(df)
3. Проверки консистентности (Consistency)
Проверяют, что связанные данные согласованы между собой:
def check_consistency(df):
"""
Проверка логической консистентности данных
"""
issues = []
# Если заказ отменён, цена должна быть 0
cancelled_with_price = df[(df['status'] == 'cancelled') & (df['total'] > 0)]
if len(cancelled_with_price) > 0:
issues.append(f"Found {len(cancelled_with_price)} cancelled orders with non-zero price")
# Количество товаров не может быть отрицательным
negative_qty = df[df['quantity'] < 0]
if len(negative_qty) > 0:
issues.append(f"Found {len(negative_qty)} rows with negative quantity")
# Дата обновления не может быть раньше даты создания
invalid_dates = df[df['updated_at'] < df['created_at']]
if len(invalid_dates) > 0:
issues.append(f"Found {len(invalid_dates)} rows with updated_at < created_at")
# Проверка суммы (itemized_total должна равняться total)
price_mismatch = df[abs(df['itemized_total'] - df['total']) > 0.01]
if len(price_mismatch) > 0:
issues.append(f"Found {len(price_mismatch)} rows with price mismatch")
return {'passed': len(issues) == 0, 'issues': issues}
check_consistency(df)
4. SQL-based Data Quality Checks
Проверки на уровне БД часто эффективнее:
-- Проверка целостности данных на уровне БД
-- 1. Иностранные ключи (referential integrity)
SELECT COUNT(*) as orphaned_orders
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE u.id IS NULL AND o.user_id IS NOT NULL;
-- 2. Дубликаты в полях, где должны быть уникальные значения
SELECT email, COUNT(*) as count
FROM users
GROUP BY email
HAVING COUNT(*) > 1;
-- 3. Проверка диапазонов
SELECT COUNT(*) as invalid_percentages
FROM metrics
WHERE completion_rate < 0 OR completion_rate > 100;
-- 4. Проверка временной корректности
SELECT COUNT(*) as future_dates
FROM events
WHERE event_date > CURRENT_TIMESTAMP;
-- 5. Проверка на NULL в критических полях
SELECT COUNT(*) as null_count
FROM orders
WHERE user_id IS NULL OR total IS NULL OR created_at IS NULL;
5. Аномалия-детекция (Anomaly Detection)
Проверки на необычные значения и статистические аномалии:
from scipy import stats
def detect_anomalies(df, column, threshold=3):
"""
Поиск выбросов методом Z-score
"""
z_scores = stats.zscore(df[column].dropna())
anomalies = df[abs(z_scores) > threshold]
return {
'anomaly_count': len(anomalies),
'anomaly_pct': (len(anomalies) / len(df)) * 100,
'anomalies': anomalies
}
# Используется для поиска необычных значений
detect_anomalies(df, 'transaction_amount', threshold=3)
# Для временных рядов: проверка на скачки
def detect_spikes(df, column, window=7):
"""
Поиск резких скачков в метриках
"""
rolling_mean = df[column].rolling(window=window).mean()
rolling_std = df[column].rolling(window=window).std()
upper_bound = rolling_mean + 3 * rolling_std
lower_bound = rolling_mean - 3 * rolling_std
spikes = df[(df[column] > upper_bound) | (df[column] < lower_bound)]
return spikes
6. Проверки свежести данных (Timeliness)
Проверяют, что данные обновлены вовремя:
from datetime import datetime, timedelta
def check_data_freshness(df, expected_update_frequency='daily'):
"""
Проверка свежести данных
"""
latest_record = df['created_at'].max()
age = datetime.now() - latest_record
if expected_update_frequency == 'daily':
freshness_threshold = timedelta(hours=26)
elif expected_update_frequency == 'hourly':
freshness_threshold = timedelta(hours=2)
else:
freshness_threshold = timedelta(hours=6)
is_fresh = age < freshness_threshold
return {
'latest_record': latest_record,
'data_age_hours': age.total_seconds() / 3600,
'is_fresh': is_fresh,
'alert': not is_fresh
}
7. Great Expectations Framework
Профессиональный подход с использованием специализированной библиотеки:
from great_expectations.dataset import PandasDataset
import great_expectations as gx
# Инициализация
df_gx = PandasDataset(df)
# Определение expectations
df_gx.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)
df_gx.expect_column_values_to_not_be_null('user_id')
df_gx.expect_column_values_to_be_in_set('status', ['pending', 'completed', 'failed'])
df_gx.expect_column_values_to_be_between('age', min_value=0, max_value=120)
df_gx.expect_column_pair_values_to_be_in_set([('status', 'amount')], [('completed', None), ('pending', 0)])
# Запуск проверок
validation_result = df_gx.validate()
print(validation_result)
Интеграция в ETL Pipeline
def run_data_quality_suite(df, stage='silver'):
"""
Комплексная проверка качества данных
"""
checks = {
'completeness': check_completeness(df, critical_columns),
'validity': validate_data_types_and_ranges(df),
'consistency': check_consistency(df),
'freshness': check_data_freshness(df),
'anomalies': detect_anomalies(df, 'amount', threshold=3)
}
# Логирование и алерты
failed_checks = [k for k, v in checks.items() if not v.get('passed', False)]
if failed_checks:
# Отправить алерт
send_alert(f"Data quality issues in {stage} layer: {failed_checks}")
# Или остановить pipeline
raise Exception(f"Data quality checks failed: {failed_checks}")
return checks
Best Practices
- Проверяйте на каждом этапе: raw → silver → gold
- Документируйте expectations: что можно считать "хорошими" данными
- Устанавливайте пороги выгрузки (blocking): когда остановить pipeline
- Логируйте метрики качества: track improvements over time
- Автоматизируйте: не полагайтесь на ручные проверки
Надежные data quality checks защищают downstream системы и гарантируют валидность анализа.