← Назад к вопросам
Когда лучше использовать декоратор?
2.0 Middle🔥 111 комментариев
#Python#Инструменты разработки
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI21 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Когда использовать декоратор в Python (Data Engineering контекст)
Определение
Декоратор — это функция, которая принимает другую функцию и возвращает изменённую версию. Паттерн оборачивания (wrapping) для добавления функциональности без изменения оригинального кода.
Синтаксис
# Базовый декоратор
def my_decorator(func):
def wrapper(*args, **kwargs):
# код ДО вызова функции
print(f"Вызываю {func.__name__}")
result = func(*args, **kwargs)
# код ПОСЛЕ вызова функции
print(f"Закончил {func.__name__}")
return result
return wrapper
# Применение
@my_decorator
def hello(name):
return f"Hello, {name}"
# Равносильно:
# hello = my_decorator(hello)
print(hello("Alice")) # Prints Before/After
Когда использовать декораторы в Data Engineering
1. Логирование и мониторинг
Проблема: Каждый Spark job нужно логировать (start time, end time, status)
import logging
import time
from functools import wraps
def log_job(func):
@wraps(func) # сохраняет оригинальное имя функции
def wrapper(*args, **kwargs):
job_name = func.__name__
logger = logging.getLogger(__name__)
start_time = time.time()
logger.info(f"JOB_START: {job_name}")
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"JOB_SUCCESS: {job_name} ({duration:.2f}s)")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"JOB_FAIL: {job_name} ({duration:.2f}s) - {str(e)}")
raise
return wrapper
@log_job
def process_users_data(spark, date):
df = spark.read.parquet(f"s3://data/users/{date}")
return df.filter(df.status == 'active')
# Логирует автоматически
2. Обработка ошибок и retry логика
Проблема: API вызовы в ETL часто падают временно (timeout, rate limit)
import time
from functools import wraps
def retry(max_attempts=3, delay=1, backoff=2):
"""Декоратор для повторных попыток с exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempt = 1
current_delay = delay
while attempt <= max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
logger.error(f"{func.__name__} failed after {max_attempts} attempts")
raise
logger.warning(f"Attempt {attempt} failed: {str(e)}. Retrying in {current_delay}s...")
time.sleep(current_delay)
current_delay *= backoff
attempt += 1
return wrapper
return decorator
@retry(max_attempts=3, delay=1, backoff=2)
def fetch_data_from_api(url):
response = requests.get(url, timeout=10)
return response.json()
# Попытается 3 раза с exponential backoff (1s, 2s, 4s)
3. Data Quality проверки
Проблема: Каждая трансформация должна валидировать результат
from typing import Callable
from pyspark.sql import DataFrame
def validate_output(expectations: dict):
"""Декоратор для проверки качества выходных данных"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs) -> DataFrame:
df = func(*args, **kwargs)
# Проверяем expectations
for col, expectation in expectations.items():
if expectation == "not_null":
null_count = df.filter(df[col].isNull()).count()
if null_count > 0:
raise ValueError(f"Column {col} has {null_count} NULL values")
elif isinstance(expectation, tuple): # (min, max)
min_val, max_val = expectation
invalid = df.filter((df[col] < min_val) | (df[col] > max_val)).count()
if invalid > 0:
raise ValueError(f"Column {col} has {invalid} values outside [{min_val}, {max_val}]")
logger.info(f"{func.__name__} output passed all validations")
return df
return wrapper
return decorator
@validate_output({
'user_id': 'not_null',
'amount': (0, 1000000),
'timestamp': 'not_null'
})
def transform_orders(spark):
df = spark.read.csv("s3://raw/orders.csv")
return df.filter(df.amount > 0)
# Валидирует автоматически
4. Measure Performance
Проблема: Какие шаги ETL медленные?
import time
from functools import wraps
def measure_time(func):
"""Измеряет время выполнения функции"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
# Отправляем метрику в Prometheus/CloudWatch
metrics.histogram(f"{func.__name__}_duration_seconds", duration)
logger.info(f"{func.__name__} took {duration:.2f}s")
return result
return wrapper
@measure_time
def load_from_database(connection_string):
# code
pass
@measure_time
def transform_data(df):
# code
pass
# Видим, что load медленнее, нужно оптимизировать
5. Кэширование результатов
Проблема: Дорогостоящие операции вызываются несколько раз
from functools import wraps, lru_cache
import pickle
def cache_result(ttl_seconds=3600):
"""Кэширует результат функции на диск"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Генерируем ключ кэша
cache_key = f"{func.__name__}_{str(args)}_{str(kwargs)}"
cache_file = f"/tmp/{cache_key}.pkl"
# Проверяем кэш
import os
if os.path.exists(cache_file):
mod_time = os.path.getmtime(cache_file)
if time.time() - mod_time < ttl_seconds:
with open(cache_file, 'rb') as f:
logger.info(f"Cache hit for {func.__name__}")
return pickle.load(f)
# Вычисляем и кэшируем
result = func(*args, **kwargs)
with open(cache_file, 'wb') as f:
pickle.dump(result, f)
return result
return wrapper
return decorator
@cache_result(ttl_seconds=3600)
def get_user_mapping():
# дорогой запрос к БД
return large_dataframe
# Первый вызов: долго, сохраняет в кэш
# Последующие вызовы в течение часа: быстро из кэша
6. Параметризация Spark jobs (Airflow контекст)
Проблема: DAG нужны параметры execution_date, но это нужно пробрасывать везде
from airflow.models import Variable
from functools import wraps
def inject_airflow_context(func):
"""Автоматически передаёт Airflow контекст"""
@wraps(func)
def wrapper(**kwargs):
# kwargs содержит execution_date и другой контекст из Airflow
execution_date = kwargs['execution_date']
return func(execution_date=execution_date, **kwargs)
return wrapper
@inject_airflow_context
def load_data(execution_date):
date_str = execution_date.strftime('%Y-%m-%d')
return spark.read.parquet(f"s3://data/{date_str}")
# В DAG просто передаём функцию, контекст передаётся автоматически
7. Разделение логики (Dependency Injection)
Проблема: Функция должна использовать разные конфигурации в разных окружениях
def with_config(config_source='prod'):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if config_source == 'prod':
config = load_prod_config()
elif config_source == 'dev':
config = load_dev_config()
return func(*args, config=config, **kwargs)
return wrapper
return decorator
@with_config(config_source='prod')
def run_pipeline(config):
spark = SparkSession.builder \
.config("spark.executor.memory", config['executor_memory']) \
.getOrCreate()
# code
# Конфиг внедряется автоматически
Когда НЕ использовать декораторы
# ❌ Плохо: слишком сложная логика
@my_decorator
def complex_function():
# Если логика внутри декоратора сложнее чем оригинальная функция
# лучше явно использовать helper функцию
pass
# ✅ Хорошо: простой cross-cutting concern
@log_job
@measure_time
@retry(max_attempts=3)
def simple_function():
pass
Стек декораторов
@log_job # Внешний слой (логирует всё)
@measure_time # Средний слой (измеряет время)
@retry(3) # Внутренний слой (повторяет при ошибке)
def fetch_and_process(url):
# Порядок: retry → measure_time → log_job
pass
Итого
Используй декораторы для:
- Логирование и мониторинг
- Retry и обработка ошибок
- Валидация данных
- Performance measurement
- Кэширование
- Dependency injection
Избегай для:
- Сложной бизнес-логики
- Когда код становится менее читаемым
- В критичном пути, где нужна максимальная производительность (они добавляют overhead)