← Назад к вопросам

Как сделать декоратор принимающий асинхронную функцию в Python?

3.0 Senior🔥 171 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Декораторы для асинхронных функций в Python

Асинхронное программирование в Python требует особого подхода при создании декораторов. Декоратор должен сохранить асинхронную природу функции и правильно передать управление.

Простой асинхронный декоратор

Базовая структура для оборачивания асинхронной функции:

import asyncio
from functools import wraps

def async_decorator(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        print(f"Before {func.__name__}")
        result = await func(*args, **kwargs)
        print(f"After {func.__name__}")
        return result
    return wrapper

@async_decorator
async def fetch_data(url):
    await asyncio.sleep(1)
    return f"Data from {url}"

# Использование
result = asyncio.run(fetch_data("http://example.com"))
print(result)

Ключевые моменты:

  • Используйте @wraps из functools для сохранения метаданных
  • Оборачивающая функция должна быть async
  • Используйте await при вызове исходной функции

Декоратор с параметрами

Когда декоратор нужно настраивать:

from functools import wraps
import asyncio

def async_timer(timeout: int = 5):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                result = await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=timeout
                )
                return result
            except asyncio.TimeoutError:
                print(f"Function {func.__name__} timed out after {timeout}s")
                return None
        return wrapper
    return decorator

@async_timer(timeout=2)
async def slow_function():
    await asyncio.sleep(3)
    return "Done"

# asyncio.run(slow_function())  # Will timeout

Логирование с асинхронностью

Практический пример с логированием и обработкой ошибок:

import asyncio
import logging
from functools import wraps
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def async_log_decorator(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = datetime.now()
        func_name = func.__name__
        
        logger.info(f"Starting {func_name} with args={args}, kwargs={kwargs}")
        
        try:
            result = await func(*args, **kwargs)
            duration = (datetime.now() - start).total_seconds()
            logger.info(f"Completed {func_name} in {duration:.2f}s, result={result}")
            return result
        except Exception as e:
            duration = (datetime.now() - start).total_seconds()
            logger.error(f"Failed {func_name} in {duration:.2f}s: {str(e)}")
            raise
    
    return wrapper

@async_log_decorator
async def api_call(endpoint: str):
    await asyncio.sleep(0.5)
    return {"status": "success"}

Декоратор для retry логики

Повторные попытки при ошибках:

import asyncio
from functools import wraps

def async_retry(max_attempts: int = 3, delay: float = 1.0):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        raise
                    print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
                    await asyncio.sleep(delay)
        return wrapper
    return decorator

@async_retry(max_attempts=3, delay=0.5)
async def unreliable_api():
    import random
    if random.random() < 0.7:
        raise Exception("API error")
    return "Success"

# asyncio.run(unreliable_api())

Декоратор для кэширования

Сохранение результатов асинхронных функций:

from functools import wraps
import asyncio

def async_cache(ttl: int = 300):
    cache = {}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            cache_key = (args, tuple(kwargs.items()))
            
            if cache_key in cache:
                result, timestamp = cache[cache_key]
                if time.time() - timestamp < ttl:
                    print(f"Cache hit for {cache_key}")
                    return result
            
            result = await func(*args, **kwargs)
            cache[cache_key] = (result, time.time())
            return result
        
        return wrapper
    return decorator

import time

@async_cache(ttl=10)
async def expensive_operation(x):
    await asyncio.sleep(2)
    return x ** 2

Декоратор для ограничения параллелизма

Контроль количества одновременных запросов:

from functools import wraps
import asyncio

def rate_limit(max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with semaphore:
                return await func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(max_concurrent=2)
async def api_request(url: str):
    await asyncio.sleep(1)
    return f"Response from {url}"

# Использование
async def main():
    tasks = [api_request(f"http://example.com/{i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)

# asyncio.run(main())

Комбинированный декоратор

Несколько функций в одном декораторе:

from functools import wraps
import asyncio
import logging

def async_decorator_full(timeout: int = 10, retries: int = 3):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(retries):
                try:
                    result = await asyncio.wait_for(
                        func(*args, **kwargs),
                        timeout=timeout
                    )
                    logging.info(f"Success: {func.__name__}")
                    return result
                except asyncio.TimeoutError:
                    logging.warning(f"Timeout on attempt {attempt + 1}")
                except Exception as e:
                    logging.error(f"Error: {e}")
                    if attempt == retries - 1:
                        raise
                    await asyncio.sleep(2 ** attempt)
            
        return wrapper
    return decorator

@async_decorator_full(timeout=5, retries=3)
async def robust_function():
    await asyncio.sleep(0.5)
    return "Done"

Проверка типов

Проверка что функция действительно асинхронная:

import asyncio
import inspect
from functools import wraps

def async_decorator_safe(func):
    if not asyncio.iscoroutinefunction(func):
        raise TypeError(f"{func.__name__} must be async")
    
    @wraps(func)
    async def wrapper(*args, **kwargs):
        return await func(*args, **kwargs)
    
    return wrapper

@async_decorator_safe
async def valid_async():
    pass

# @async_decorator_safe
# def invalid_sync():  # Will raise TypeError
#     pass

Лучшие практики

  • Всегда используйте @wraps из functools
  • Оборачивающая функция должна быть async если вы используете await
  • Проверяйте через asyncio.iscoroutinefunction что функция асинхронная
  • Не блокируйте асинхронное выполнение с использованием обычных вызовов
  • Используйте asyncio.gather для параллельного выполнения нескольких задач
  • Тестируйте асинхронный код с pytest-asyncio

Тестирование асинхронных декораторов

import pytest

@pytest.mark.asyncio
async def test_async_decorator():
    @async_decorator
    async def test_func():
        return "result"
    
    result = await test_func()
    assert result == "result"
Как сделать декоратор принимающий асинхронную функцию в Python? | PrepBro