← Назад к вопросам

Когда лучше использовать декоратор?

2.0 Middle🔥 111 комментариев
#Python#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI21 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Когда использовать декоратор в Python (Data Engineering контекст)

Определение

Декоратор — это функция, которая принимает другую функцию и возвращает изменённую версию. Паттерн оборачивания (wrapping) для добавления функциональности без изменения оригинального кода.

Синтаксис

# Базовый декоратор
def my_decorator(func):
    def wrapper(*args, **kwargs):
        # код ДО вызова функции
        print(f"Вызываю {func.__name__}")
        result = func(*args, **kwargs)
        # код ПОСЛЕ вызова функции
        print(f"Закончил {func.__name__}")
        return result
    return wrapper

# Применение
@my_decorator
def hello(name):
    return f"Hello, {name}"

# Равносильно:
# hello = my_decorator(hello)

print(hello("Alice"))  # Prints Before/After

Когда использовать декораторы в Data Engineering

1. Логирование и мониторинг

Проблема: Каждый Spark job нужно логировать (start time, end time, status)

import logging
import time
from functools import wraps

def log_job(func):
    @wraps(func)  # сохраняет оригинальное имя функции
    def wrapper(*args, **kwargs):
        job_name = func.__name__
        logger = logging.getLogger(__name__)
        
        start_time = time.time()
        logger.info(f"JOB_START: {job_name}")
        
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            logger.info(f"JOB_SUCCESS: {job_name} ({duration:.2f}s)")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"JOB_FAIL: {job_name} ({duration:.2f}s) - {str(e)}")
            raise
    return wrapper

@log_job
def process_users_data(spark, date):
    df = spark.read.parquet(f"s3://data/users/{date}")
    return df.filter(df.status == 'active')

# Логирует автоматически

2. Обработка ошибок и retry логика

Проблема: API вызовы в ETL часто падают временно (timeout, rate limit)

import time
from functools import wraps

def retry(max_attempts=3, delay=1, backoff=2):
    """Декоратор для повторных попыток с exponential backoff"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 1
            current_delay = delay
            
            while attempt <= max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        logger.error(f"{func.__name__} failed after {max_attempts} attempts")
                        raise
                    
                    logger.warning(f"Attempt {attempt} failed: {str(e)}. Retrying in {current_delay}s...")
                    time.sleep(current_delay)
                    current_delay *= backoff
                    attempt += 1
        return wrapper
    return decorator

@retry(max_attempts=3, delay=1, backoff=2)
def fetch_data_from_api(url):
    response = requests.get(url, timeout=10)
    return response.json()

# Попытается 3 раза с exponential backoff (1s, 2s, 4s)

3. Data Quality проверки

Проблема: Каждая трансформация должна валидировать результат

from typing import Callable
from pyspark.sql import DataFrame

def validate_output(expectations: dict):
    """Декоратор для проверки качества выходных данных"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs) -> DataFrame:
            df = func(*args, **kwargs)
            
            # Проверяем expectations
            for col, expectation in expectations.items():
                if expectation == "not_null":
                    null_count = df.filter(df[col].isNull()).count()
                    if null_count > 0:
                        raise ValueError(f"Column {col} has {null_count} NULL values")
                
                elif isinstance(expectation, tuple):  # (min, max)
                    min_val, max_val = expectation
                    invalid = df.filter((df[col] < min_val) | (df[col] > max_val)).count()
                    if invalid > 0:
                        raise ValueError(f"Column {col} has {invalid} values outside [{min_val}, {max_val}]")
            
            logger.info(f"{func.__name__} output passed all validations")
            return df
        return wrapper
    return decorator

@validate_output({
    'user_id': 'not_null',
    'amount': (0, 1000000),
    'timestamp': 'not_null'
})
def transform_orders(spark):
    df = spark.read.csv("s3://raw/orders.csv")
    return df.filter(df.amount > 0)

# Валидирует автоматически

4. Measure Performance

Проблема: Какие шаги ETL медленные?

import time
from functools import wraps

def measure_time(func):
    """Измеряет время выполнения функции"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        
        # Отправляем метрику в Prometheus/CloudWatch
        metrics.histogram(f"{func.__name__}_duration_seconds", duration)
        logger.info(f"{func.__name__} took {duration:.2f}s")
        
        return result
    return wrapper

@measure_time
def load_from_database(connection_string):
    # code
    pass

@measure_time
def transform_data(df):
    # code
    pass

# Видим, что load медленнее, нужно оптимизировать

5. Кэширование результатов

Проблема: Дорогостоящие операции вызываются несколько раз

from functools import wraps, lru_cache
import pickle

def cache_result(ttl_seconds=3600):
    """Кэширует результат функции на диск"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Генерируем ключ кэша
            cache_key = f"{func.__name__}_{str(args)}_{str(kwargs)}"
            cache_file = f"/tmp/{cache_key}.pkl"
            
            # Проверяем кэш
            import os
            if os.path.exists(cache_file):
                mod_time = os.path.getmtime(cache_file)
                if time.time() - mod_time < ttl_seconds:
                    with open(cache_file, 'rb') as f:
                        logger.info(f"Cache hit for {func.__name__}")
                        return pickle.load(f)
            
            # Вычисляем и кэшируем
            result = func(*args, **kwargs)
            with open(cache_file, 'wb') as f:
                pickle.dump(result, f)
            
            return result
        return wrapper
    return decorator

@cache_result(ttl_seconds=3600)
def get_user_mapping():
    # дорогой запрос к БД
    return large_dataframe

# Первый вызов: долго, сохраняет в кэш
# Последующие вызовы в течение часа: быстро из кэша

6. Параметризация Spark jobs (Airflow контекст)

Проблема: DAG нужны параметры execution_date, но это нужно пробрасывать везде

from airflow.models import Variable
from functools import wraps

def inject_airflow_context(func):
    """Автоматически передаёт Airflow контекст"""
    @wraps(func)
    def wrapper(**kwargs):
        # kwargs содержит execution_date и другой контекст из Airflow
        execution_date = kwargs['execution_date']
        return func(execution_date=execution_date, **kwargs)
    return wrapper

@inject_airflow_context
def load_data(execution_date):
    date_str = execution_date.strftime('%Y-%m-%d')
    return spark.read.parquet(f"s3://data/{date_str}")

# В DAG просто передаём функцию, контекст передаётся автоматически

7. Разделение логики (Dependency Injection)

Проблема: Функция должна использовать разные конфигурации в разных окружениях

def with_config(config_source='prod'):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if config_source == 'prod':
                config = load_prod_config()
            elif config_source == 'dev':
                config = load_dev_config()
            
            return func(*args, config=config, **kwargs)
        return wrapper
    return decorator

@with_config(config_source='prod')
def run_pipeline(config):
    spark = SparkSession.builder \
        .config("spark.executor.memory", config['executor_memory']) \
        .getOrCreate()
    # code

# Конфиг внедряется автоматически

Когда НЕ использовать декораторы

# ❌ Плохо: слишком сложная логика
@my_decorator
def complex_function():
    # Если логика внутри декоратора сложнее чем оригинальная функция
    # лучше явно использовать helper функцию
    pass

# ✅ Хорошо: простой cross-cutting concern
@log_job
@measure_time
@retry(max_attempts=3)
def simple_function():
    pass

Стек декораторов

@log_job          # Внешний слой (логирует всё)
@measure_time     # Средний слой (измеряет время)
@retry(3)         # Внутренний слой (повторяет при ошибке)
def fetch_and_process(url):
    # Порядок: retry → measure_time → log_job
    pass

Итого

Используй декораторы для:

  • Логирование и мониторинг
  • Retry и обработка ошибок
  • Валидация данных
  • Performance measurement
  • Кэширование
  • Dependency injection

Избегай для:

  • Сложной бизнес-логики
  • Когда код становится менее читаемым
  • В критичном пути, где нужна максимальная производительность (они добавляют overhead)
Когда лучше использовать декоратор? | PrepBro