← Назад к вопросам

Как тестируешь код?

2.3 Middle🔥 141 комментариев
#Python#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как тестируешь код (Data Engineering перспектива)

1. Особенность тестирования Data Engineering

В отличие от веб-разработки, где ты тестируешь логику (функции), в Data Engineering большая часть кода работает с данными. Поэтому тестирование фокусируется на:

  • Корректность трансформации данных — входные X преобразуются в ожидаемые Y
  • Обработка edge cases — NULL, дубликаты, пустые данные
  • Качество данных — не нарушены инварианты (unique constraints, диапазоны)
  • Производительность — может ли ETL обработать миллионы строк за приемлемое время

2. Типы тестов

Unit tests (изолированные функции)

# tests/unit/transformers/test_user_dedup.py
import pytest
import pandas as pd
from transformers.user_dedup import deduplicate_users

def test_basic_dedup():
    """Базовое удаление дубликатов"""
    df = pd.DataFrame({
        'user_id': [1, 1, 2, 2, 3],
        'email': ['a@b.com', 'a@b.com', 'c@d.com', 'c@d.com', 'e@f.com']
    })
    
    result = deduplicate_users(df)
    
    assert len(result) == 3
    assert result['user_id'].is_unique

def test_dedup_keeps_latest():
    """Убедись, что оставляем самую свежую версию"""
    df = pd.DataFrame({
        'user_id': [1, 1],
        'email': ['old@b.com', 'new@b.com'],
        'updated_at': ['2024-01-01', '2024-03-26']
    })
    
    result = deduplicate_users(df)
    
    assert result.iloc[0]['email'] == 'new@b.com'

def test_dedup_nulls():
    """Обработка NULL значений"""
    df = pd.DataFrame({
        'user_id': [1, 2, None],
        'email': ['a@b.com', 'c@d.com', None]
    })
    
    result = deduplicate_users(df)
    
    # Нужна логика: удаляем или сохраняем NULL?
    assert result['user_id'].isna().sum() == 0

def test_large_dataset():
    """Проверь на большом датасете (performance)"""
    df = pd.DataFrame({
        'user_id': list(range(1000000)),
        'email': [f'user{i}@b.com' for i in range(1000000)]
    })
    
    import time
    start = time.time()
    result = deduplicate_users(df)
    elapsed = time.time() - start
    
    assert elapsed < 5  # Должна выполниться за < 5 сек

Integration tests (ETL pipeline)

# tests/integration/etl/test_daily_pipeline.py
import pytest
from sqlalchemy import create_engine
import pandas as pd
from pipelines.daily_etl import run_daily_etl

@pytest.fixture
def test_db():
    """Создай тестовую БД"""
    engine = create_engine('sqlite:///:memory:')
    # Создай схему для теста
    engine.execute("""
        CREATE TABLE users (
            id INT PRIMARY KEY,
            email VARCHAR(255),
            created_at TIMESTAMP
        )
    """)
    yield engine
    # Очистка после теста

def test_daily_etl_loads_data(test_db):
    """Проверь, что ETL загрузил данные"""
    # Подготовка: создай источники данных
    
    # Выполнение
    run_daily_etl(source_db=test_db, target_db=test_db)
    
    # Проверка
    result = pd.read_sql("SELECT COUNT(*) FROM users", test_db)
    assert result.iloc[0, 0] > 0

def test_daily_etl_handles_errors(test_db):
    """Проверь, что ETL корректно обрабатывает ошибки"""
    # Создай источник с плохими данными
    bad_data = pd.DataFrame({
        'id': [1, 2, 'invalid'],  # Третий ID — не число
        'email': ['a@b.com', 'c@d.com', None]
    })
    
    # ETL должен обработать, не упав
    # (удалить инвалидные строки, залогировать ошибки)
    result = run_daily_etl(bad_data)
    
    assert len(result) == 2  # Только валидные
    assert result['id'].dtype == 'int64'  # Правильный тип

Data quality tests (Great Expectations)

# tests/data_quality/test_users_quality.py
from great_expectations.dataset import PandasDataset

def test_users_data_quality():
    """Проверка качества данных по пути в DWH"""
    df = pd.read_sql("SELECT * FROM dim_users", engine)
    gdf = PandasDataset(df)
    
    # user_id должен быть уникален и не NULL
    gdf.expect_column_values_to_be_unique("user_id")
    gdf.expect_column_values_to_not_be_null("user_id")
    
    # email должен быть валидным
    gdf.expect_column_values_to_match_regex(
        "email",
        r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    )
    
    # age должен быть в диапазоне [0, 150]
    gdf.expect_column_values_to_be_between("age", 0, 150)
    
    # created_at должна быть в прошлом
    gdf.expect_column_values_to_be_between(
        "created_at",
        min_value=pd.Timestamp('2000-01-01'),
        max_value=pd.Timestamp.now()
    )
    
    result = gdf.validate()
    assert result['success']

3. Тестирование SQL queries

Плохо: просто скопипастить SQL из production

# ❌ Не тестируемо
def get_daily_revenue():
    query = """
        SELECT DATE(order_date), SUM(amount)
        FROM orders
        GROUP BY DATE(order_date)
    """
    return pd.read_sql(query, engine)

Хорошо: функция + unit test

# ✅ Тестируемо
from sqlalchemy import text

def get_daily_revenue(start_date: str, end_date: str) -> pd.DataFrame:
    query = text("""
    SELECT 
        DATE(order_date) as date,
        SUM(amount) as revenue
    FROM orders
    WHERE order_date >= :start_date AND order_date <= :end_date
    GROUP BY DATE(order_date)
    ORDER BY date
    """)
    return pd.read_sql(query, engine, params={'start_date': start_date, 'end_date': end_date})

# tests/unit/analytics/test_daily_revenue.py
def test_daily_revenue():
    """Проверка, что функция возвращает корректный результат"""
    result = get_daily_revenue('2024-03-20', '2024-03-26')
    
    # Проверки
    assert 'date' in result.columns
    assert 'revenue' in result.columns
    assert len(result) == 7  # 7 дней
    assert result['revenue'].dtype == 'float64'
    assert (result['revenue'] >= 0).all()  # Доход не может быть отрицательным

4. Тестирование Spark Job

# src/spark_jobs/user_aggregation.py
from pyspark.sql import SparkSession

def aggregate_user_metrics(
    input_path: str,
    output_path: str,
    spark: SparkSession
) -> None:
    """
    Агрегирует метрики по пользователям из Parquet
    """
    df = spark.read.parquet(input_path)
    
    result = (df
        .filter(df.status == 'active')
        .groupBy('user_id')
        .agg(
            F.count('*').alias('event_count'),
            F.avg('event_value').alias('avg_value')
        )
        .filter(F.col('event_count') > 0)
    )
    
    result.write.mode('overwrite').parquet(output_path)

# tests/spark/test_user_aggregation.py
import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='session')
def spark():
    return SparkSession.builder.appName('test').getOrCreate()

def test_user_aggregation_filters_inactive(spark, tmp_path):
    """Проверь, что неактивные пользователи отфильтрованы"""
    # Подготовка: создай тестовый датафрейм
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    schema = StructType([
        StructField('user_id', IntegerType()),
        StructField('status', StringType()),
        StructField('event_value', IntegerType())
    ])
    
    data = [
        (1, 'active', 100),
        (2, 'inactive', 50),
        (1, 'active', 200)
    ]
    
    df = spark.createDataFrame(data, schema)
    df.write.parquet(str(tmp_path / 'input'))
    
    # Выполнение
    aggregate_user_metrics(
        str(tmp_path / 'input'),
        str(tmp_path / 'output'),
        spark
    )
    
    # Проверка
    result = spark.read.parquet(str(tmp_path / 'output'))
    
    # Только user_id=1 (inactive отфильтрована)
    assert result.count() == 1
    assert result.filter(result.user_id == 1).select('event_count').collect()[0][0] == 2

5. Mocking при тестировании API запросов

# src/extractors/api_extractor.py
import requests

def fetch_users_from_api(api_url: str) -> list:
    """Получи пользователей из API"""
    response = requests.get(f"{api_url}/users")
    response.raise_for_status()
    return response.json()

# tests/unit/extractors/test_api_extractor.py
import pytest
from unittest.mock import patch, MagicMock

def test_fetch_users_success():
    """Успешное получение данных"""
    mock_data = [
        {'id': 1, 'email': 'a@b.com'},
        {'id': 2, 'email': 'c@d.com'}
    ]
    
    with patch('requests.get') as mock_get:
        mock_response = MagicMock()
        mock_response.json.return_value = mock_data
        mock_get.return_value = mock_response
        
        result = fetch_users_from_api('https://api.example.com')
        
        assert len(result) == 2
        assert result[0]['id'] == 1

def test_fetch_users_api_error():
    """Обработка API ошибок"""
    with patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.ConnectionError()
        
        with pytest.raises(requests.exceptions.ConnectionError):
            fetch_users_from_api('https://api.example.com')

6. Pytest fixtures (переиспользуемые компоненты)

# tests/conftest.py (общие фикстуры для всех тестов)
import pytest
from sqlalchemy import create_engine
import pandas as pd

@pytest.fixture(scope='session')
def test_engine():
    """Тестовая база данных"""
    engine = create_engine('postgresql://test:test@localhost/test_db')
    yield engine
    # Очистка после тестов

@pytest.fixture
def sample_users_df():
    """Тестовый датафрейм пользователей"""
    return pd.DataFrame({
        'user_id': [1, 2, 3],
        'email': ['a@b.com', 'c@d.com', 'e@f.com'],
        'age': [25, 30, 35]
    })

# tests/unit/test_something.py
def test_transform(sample_users_df):
    """Используй фикстуру"""
    result = my_transform(sample_users_df)
    assert len(result) == 3

7. Coverage и CI/CD

# Запуск тестов с измерением покрытия
pytest --cov=src --cov-report=html

# Нужное покрытие для Data Engineering: 85%+
# (код с данными сложнее, чем веб-бизнес логика)

8. Типичный процесс тестирования

# 1. Напиши тест ПЕРВЫМ (TDD)
def test_user_age_validation():
    df = pd.DataFrame({'user_id': [1, 2], 'age': [25, -5]})  # Плохой возраст
    result = validate_users(df)
    assert len(result) == 1  # Только строка 1

# 2. Напиши минимальный код для прохождения теста
def validate_users(df):
    return df[df['age'] >= 0]

# 3. Запусти тест
pytest test_validation.py  # ✓ Passed

# 4. Рефакторь если нужно
def validate_users(df):
    """Валидирует пользователей"""
    assert 'age' in df.columns, "Column 'age' required"
    return df[(df['age'] >= 0) & (df['age'] <= 150)]

9. Тестирование миграций (Goose SQL)

# tests/integration/test_migrations.py
import subprocess

def test_migration_0001_creates_users_table():
    """Проверь, что миграция создала таблицу"""
    # Откати все миграции
    subprocess.run(['goose', '-dir', 'migrations', 'down-to', '0'])
    
    # Накати одну миграцию
    subprocess.run(['goose', '-dir', 'migrations', 'up'])
    
    # Проверь, что таблица существует
    result = pd.read_sql(
        "SELECT table_name FROM information_schema.tables WHERE table_name='users'",
        engine
    )
    assert len(result) == 1

10. Важные принципы

  • AAA pattern — Arrange (подготовка), Act (выполнение), Assert (проверка)
  • Одна ассерция на тест — каждый тест проверяет ОДНО
  • Моки и фикстуры — изолируй тесты от внешних зависимостей
  • Тестируй edge cases — NULL, пустые данные, большие объёмы
  • Performance тесты — убедись, что ETL не упадёт на миллионах строк

Заключение

Тестирование в Data Engineering — это про доверие к данным. Без тестов можно загрузить миллиарды плохих строк в production и никто не заметит неделю. Хорошие тесты — это инвестиция в надёжность и скорость разработки.