Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как тестируешь код (Data Engineering перспектива)
1. Особенность тестирования Data Engineering
В отличие от веб-разработки, где ты тестируешь логику (функции), в Data Engineering большая часть кода работает с данными. Поэтому тестирование фокусируется на:
- Корректность трансформации данных — входные X преобразуются в ожидаемые Y
- Обработка edge cases — NULL, дубликаты, пустые данные
- Качество данных — не нарушены инварианты (unique constraints, диапазоны)
- Производительность — может ли ETL обработать миллионы строк за приемлемое время
2. Типы тестов
Unit tests (изолированные функции)
# tests/unit/transformers/test_user_dedup.py
import pytest
import pandas as pd
from transformers.user_dedup import deduplicate_users
def test_basic_dedup():
"""Базовое удаление дубликатов"""
df = pd.DataFrame({
'user_id': [1, 1, 2, 2, 3],
'email': ['a@b.com', 'a@b.com', 'c@d.com', 'c@d.com', 'e@f.com']
})
result = deduplicate_users(df)
assert len(result) == 3
assert result['user_id'].is_unique
def test_dedup_keeps_latest():
"""Убедись, что оставляем самую свежую версию"""
df = pd.DataFrame({
'user_id': [1, 1],
'email': ['old@b.com', 'new@b.com'],
'updated_at': ['2024-01-01', '2024-03-26']
})
result = deduplicate_users(df)
assert result.iloc[0]['email'] == 'new@b.com'
def test_dedup_nulls():
"""Обработка NULL значений"""
df = pd.DataFrame({
'user_id': [1, 2, None],
'email': ['a@b.com', 'c@d.com', None]
})
result = deduplicate_users(df)
# Нужна логика: удаляем или сохраняем NULL?
assert result['user_id'].isna().sum() == 0
def test_large_dataset():
"""Проверь на большом датасете (performance)"""
df = pd.DataFrame({
'user_id': list(range(1000000)),
'email': [f'user{i}@b.com' for i in range(1000000)]
})
import time
start = time.time()
result = deduplicate_users(df)
elapsed = time.time() - start
assert elapsed < 5 # Должна выполниться за < 5 сек
Integration tests (ETL pipeline)
# tests/integration/etl/test_daily_pipeline.py
import pytest
from sqlalchemy import create_engine
import pandas as pd
from pipelines.daily_etl import run_daily_etl
@pytest.fixture
def test_db():
"""Создай тестовую БД"""
engine = create_engine('sqlite:///:memory:')
# Создай схему для теста
engine.execute("""
CREATE TABLE users (
id INT PRIMARY KEY,
email VARCHAR(255),
created_at TIMESTAMP
)
""")
yield engine
# Очистка после теста
def test_daily_etl_loads_data(test_db):
"""Проверь, что ETL загрузил данные"""
# Подготовка: создай источники данных
# Выполнение
run_daily_etl(source_db=test_db, target_db=test_db)
# Проверка
result = pd.read_sql("SELECT COUNT(*) FROM users", test_db)
assert result.iloc[0, 0] > 0
def test_daily_etl_handles_errors(test_db):
"""Проверь, что ETL корректно обрабатывает ошибки"""
# Создай источник с плохими данными
bad_data = pd.DataFrame({
'id': [1, 2, 'invalid'], # Третий ID — не число
'email': ['a@b.com', 'c@d.com', None]
})
# ETL должен обработать, не упав
# (удалить инвалидные строки, залогировать ошибки)
result = run_daily_etl(bad_data)
assert len(result) == 2 # Только валидные
assert result['id'].dtype == 'int64' # Правильный тип
Data quality tests (Great Expectations)
# tests/data_quality/test_users_quality.py
from great_expectations.dataset import PandasDataset
def test_users_data_quality():
"""Проверка качества данных по пути в DWH"""
df = pd.read_sql("SELECT * FROM dim_users", engine)
gdf = PandasDataset(df)
# user_id должен быть уникален и не NULL
gdf.expect_column_values_to_be_unique("user_id")
gdf.expect_column_values_to_not_be_null("user_id")
# email должен быть валидным
gdf.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# age должен быть в диапазоне [0, 150]
gdf.expect_column_values_to_be_between("age", 0, 150)
# created_at должна быть в прошлом
gdf.expect_column_values_to_be_between(
"created_at",
min_value=pd.Timestamp('2000-01-01'),
max_value=pd.Timestamp.now()
)
result = gdf.validate()
assert result['success']
3. Тестирование SQL queries
Плохо: просто скопипастить SQL из production
# ❌ Не тестируемо
def get_daily_revenue():
query = """
SELECT DATE(order_date), SUM(amount)
FROM orders
GROUP BY DATE(order_date)
"""
return pd.read_sql(query, engine)
Хорошо: функция + unit test
# ✅ Тестируемо
from sqlalchemy import text
def get_daily_revenue(start_date: str, end_date: str) -> pd.DataFrame:
query = text("""
SELECT
DATE(order_date) as date,
SUM(amount) as revenue
FROM orders
WHERE order_date >= :start_date AND order_date <= :end_date
GROUP BY DATE(order_date)
ORDER BY date
""")
return pd.read_sql(query, engine, params={'start_date': start_date, 'end_date': end_date})
# tests/unit/analytics/test_daily_revenue.py
def test_daily_revenue():
"""Проверка, что функция возвращает корректный результат"""
result = get_daily_revenue('2024-03-20', '2024-03-26')
# Проверки
assert 'date' in result.columns
assert 'revenue' in result.columns
assert len(result) == 7 # 7 дней
assert result['revenue'].dtype == 'float64'
assert (result['revenue'] >= 0).all() # Доход не может быть отрицательным
4. Тестирование Spark Job
# src/spark_jobs/user_aggregation.py
from pyspark.sql import SparkSession
def aggregate_user_metrics(
input_path: str,
output_path: str,
spark: SparkSession
) -> None:
"""
Агрегирует метрики по пользователям из Parquet
"""
df = spark.read.parquet(input_path)
result = (df
.filter(df.status == 'active')
.groupBy('user_id')
.agg(
F.count('*').alias('event_count'),
F.avg('event_value').alias('avg_value')
)
.filter(F.col('event_count') > 0)
)
result.write.mode('overwrite').parquet(output_path)
# tests/spark/test_user_aggregation.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope='session')
def spark():
return SparkSession.builder.appName('test').getOrCreate()
def test_user_aggregation_filters_inactive(spark, tmp_path):
"""Проверь, что неактивные пользователи отфильтрованы"""
# Подготовка: создай тестовый датафрейм
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField('user_id', IntegerType()),
StructField('status', StringType()),
StructField('event_value', IntegerType())
])
data = [
(1, 'active', 100),
(2, 'inactive', 50),
(1, 'active', 200)
]
df = spark.createDataFrame(data, schema)
df.write.parquet(str(tmp_path / 'input'))
# Выполнение
aggregate_user_metrics(
str(tmp_path / 'input'),
str(tmp_path / 'output'),
spark
)
# Проверка
result = spark.read.parquet(str(tmp_path / 'output'))
# Только user_id=1 (inactive отфильтрована)
assert result.count() == 1
assert result.filter(result.user_id == 1).select('event_count').collect()[0][0] == 2
5. Mocking при тестировании API запросов
# src/extractors/api_extractor.py
import requests
def fetch_users_from_api(api_url: str) -> list:
"""Получи пользователей из API"""
response = requests.get(f"{api_url}/users")
response.raise_for_status()
return response.json()
# tests/unit/extractors/test_api_extractor.py
import pytest
from unittest.mock import patch, MagicMock
def test_fetch_users_success():
"""Успешное получение данных"""
mock_data = [
{'id': 1, 'email': 'a@b.com'},
{'id': 2, 'email': 'c@d.com'}
]
with patch('requests.get') as mock_get:
mock_response = MagicMock()
mock_response.json.return_value = mock_data
mock_get.return_value = mock_response
result = fetch_users_from_api('https://api.example.com')
assert len(result) == 2
assert result[0]['id'] == 1
def test_fetch_users_api_error():
"""Обработка API ошибок"""
with patch('requests.get') as mock_get:
mock_get.side_effect = requests.exceptions.ConnectionError()
with pytest.raises(requests.exceptions.ConnectionError):
fetch_users_from_api('https://api.example.com')
6. Pytest fixtures (переиспользуемые компоненты)
# tests/conftest.py (общие фикстуры для всех тестов)
import pytest
from sqlalchemy import create_engine
import pandas as pd
@pytest.fixture(scope='session')
def test_engine():
"""Тестовая база данных"""
engine = create_engine('postgresql://test:test@localhost/test_db')
yield engine
# Очистка после тестов
@pytest.fixture
def sample_users_df():
"""Тестовый датафрейм пользователей"""
return pd.DataFrame({
'user_id': [1, 2, 3],
'email': ['a@b.com', 'c@d.com', 'e@f.com'],
'age': [25, 30, 35]
})
# tests/unit/test_something.py
def test_transform(sample_users_df):
"""Используй фикстуру"""
result = my_transform(sample_users_df)
assert len(result) == 3
7. Coverage и CI/CD
# Запуск тестов с измерением покрытия
pytest --cov=src --cov-report=html
# Нужное покрытие для Data Engineering: 85%+
# (код с данными сложнее, чем веб-бизнес логика)
8. Типичный процесс тестирования
# 1. Напиши тест ПЕРВЫМ (TDD)
def test_user_age_validation():
df = pd.DataFrame({'user_id': [1, 2], 'age': [25, -5]}) # Плохой возраст
result = validate_users(df)
assert len(result) == 1 # Только строка 1
# 2. Напиши минимальный код для прохождения теста
def validate_users(df):
return df[df['age'] >= 0]
# 3. Запусти тест
pytest test_validation.py # ✓ Passed
# 4. Рефакторь если нужно
def validate_users(df):
"""Валидирует пользователей"""
assert 'age' in df.columns, "Column 'age' required"
return df[(df['age'] >= 0) & (df['age'] <= 150)]
9. Тестирование миграций (Goose SQL)
# tests/integration/test_migrations.py
import subprocess
def test_migration_0001_creates_users_table():
"""Проверь, что миграция создала таблицу"""
# Откати все миграции
subprocess.run(['goose', '-dir', 'migrations', 'down-to', '0'])
# Накати одну миграцию
subprocess.run(['goose', '-dir', 'migrations', 'up'])
# Проверь, что таблица существует
result = pd.read_sql(
"SELECT table_name FROM information_schema.tables WHERE table_name='users'",
engine
)
assert len(result) == 1
10. Важные принципы
- ✅ AAA pattern — Arrange (подготовка), Act (выполнение), Assert (проверка)
- ✅ Одна ассерция на тест — каждый тест проверяет ОДНО
- ✅ Моки и фикстуры — изолируй тесты от внешних зависимостей
- ✅ Тестируй edge cases — NULL, пустые данные, большие объёмы
- ✅ Performance тесты — убедись, что ETL не упадёт на миллионах строк
Заключение
Тестирование в Data Engineering — это про доверие к данным. Без тестов можно загрузить миллиарды плохих строк в production и никто не заметит неделю. Хорошие тесты — это инвестиция в надёжность и скорость разработки.