← Назад к вопросам

Как создать асинхронный контекстный менеджер?

2.0 Middle🔥 191 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Создание асинхронного контекстного менеджера в Python

Асинхронные контекстные менеджеры позволяют работать с ресурсами (соединения БД, файлы, блокировки) в асинхронном коде с гарантией корректной очистки.

1. Основы: async with и методы

Асинхронный контекстный менеджер реализует два метода:

  • __aenter__() — вход в контекст (выполняется с await)
  • __aexit__() — выход из контекста (выполняется с await)
import asyncio
from typing import Optional

class AsyncDatabaseConnection:
    """Асинхронный контекстный менеджер для БД"""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.is_connected = False
    
    async def __aenter__(self):
        """Вход в контекст (открытие соединения)"""
        print(f"Connecting to {self.connection_string}...")
        await asyncio.sleep(0.5)  # Симуляция подключения
        self.is_connected = True
        print("Connected!")
        return self  # Возвращаем объект для использования в блоке with
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Выход из контекста (закрытие соединения)"""
        print("Closing connection...")
        await asyncio.sleep(0.2)  # Симуляция закрытия
        self.is_connected = False
        print("Connection closed!")
        return False  # Пробросить исключение, если оно было
    
    async def execute_query(self, query: str):
        """Выполнить запрос"""
        if not self.is_connected:
            raise RuntimeError("Not connected")
        print(f"Executing: {query}")
        await asyncio.sleep(0.3)  # Симуляция запроса
        return [{"id": 1, "name": "Alice"}]

# Использование
async def main():
    async with AsyncDatabaseConnection("postgres://localhost") as db:
        result = await db.execute_query("SELECT * FROM users")
        print(f"Result: {result}")
    # Соединение автоматически закрыто здесь

asyncio.run(main())

# Вывод:
# Connecting to postgres://localhost...
# Connected!
# Executing: SELECT * FROM users
# Result: [{'id': 1, 'name': 'Alice'}]
# Closing connection...
# Connection closed!

2. Параметры __aexit__

Метод __aexit__ получает информацию об исключении:

class AsyncContextManager:
    async def __aenter__(self):
        print("Entering context")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # exc_type: тип исключения (None если ошибки не было)
        # exc_val: значение исключения
        # exc_tb: traceback
        
        if exc_type is not None:
            print(f"Exception occurred: {exc_type.__name__}: {exc_val}")
            return False  # Пробросить исключение
        else:
            print("Exiting normally")
            return True  # Подавить исключение (если было)

# Тест с исключением
async def test_with_exception():
    try:
        async with AsyncContextManager() as ctx:
            raise ValueError("Something went wrong!")
    except ValueError:
        print("Exception was raised")

asyncio.run(test_with_exception())

3. Использование contextlib.asynccontextmanager

Для простых случаев лучше использовать декоратор:

from contextlib import asynccontextmanager

@asynccontextmanager
async def async_database_connection(connection_string: str):
    """Контекстный менеджер как генератор"""
    print(f"Connecting to {connection_string}...")
    await asyncio.sleep(0.5)
    print("Connected!")
    
    try:
        yield  # Сюда передаём объект ресурса (можно опустить)
    finally:
        print("Closing connection...")
        await asyncio.sleep(0.2)
        print("Connection closed!")

# Использование
async def main():
    async with async_database_connection("postgres://localhost") as _:
        print("Inside context")

asyncio.run(main())

С возвращаемым значением:

@asynccontextmanager
async def async_database(connection_string: str):
    print(f"Connecting to {connection_string}...")
    
    class DB:
        async def query(self, sql):
            return [{"result": 1}]
    
    db = DB()
    
    try:
        yield db  # Возвращаем объект БД
    finally:
        print("Cleaning up...")

async def main():
    async with async_database("postgres://localhost") as db:
        result = await db.query("SELECT 1")
        print(f"Result: {result}")

asyncio.run(main())

4. Обработка исключений

@asynccontextmanager
async def async_resource_safe():
    print("Setting up resource")
    
    try:
        yield "resource_value"
    except ValueError as e:
        print(f"Caught ValueError: {e}")
        # Подавляем исключение (не пробросим дальше)
        # raise  # раскомментируй для проброса
    except Exception as e:
        print(f"Caught other exception: {e}")
        raise  # Пробросим это исключение
    finally:
        print("Cleaning up resource")

async def main():
    try:
        async with async_resource_safe() as res:
            print(f"Using: {res}")
            raise ValueError("Test error")
    except ValueError:
        print("Outside context: ValueError was handled")

asyncio.run(main())

5. Практический пример: асинхронная блокировка

import asyncio
from contextlib import asynccontextmanager

class AsyncLock:
    """Простая асинхронная блокировка"""
    
    def __init__(self):
        self._lock = asyncio.Lock()
    
    async def __aenter__(self):
        await self._lock.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._lock.release()

# Использование
shared_resource = AsyncLock()

async def worker(name):
    async with shared_resource:
        print(f"{name}: Acquired lock")
        await asyncio.sleep(1)
        print(f"{name}: Releasing lock")

async def main():
    # Задачи будут исполняться последовательно благодаря lock
    await asyncio.gather(
        worker("Worker 1"),
        worker("Worker 2"),
        worker("Worker 3")
    )

asyncio.run(main())

6. Асинхронный менеджер для файловых операций

from contextlib import asynccontextmanager
import aiofiles

@asynccontextmanager
async def async_file_reader(filename: str):
    """Асинхронное чтение файла"""
    file = await aiofiles.open(filename, 'r')
    try:
        yield file
    finally:
        await file.close()

# Использование
async def main():
    async with async_file_reader('data.txt') as file:
        content = await file.read()
        print(content)

# Или используй встроенный async with:
async def main2():
    async with aiofiles.open('data.txt', 'r') as file:
        content = await file.read()
        print(content)

7. Вложенные асинхронные контексты

@asynccontextmanager
async def resource_a():
    print("Acquiring resource A")
    yield "A"
    print("Releasing resource A")

@asynccontextmanager
async def resource_b():
    print("Acquiring resource B")
    yield "B"
    print("Releasing resource B")

async def main():
    # Несколько контекстов
    async with resource_a() as a, resource_b() as b:
        print(f"Using {a} and {b}")
    
    # Или вложенные
    async with resource_a() as a:
        async with resource_b() as b:
            print(f"Nested: {a}, {b}")

asyncio.run(main())

# Вывод:
# Acquiring resource A
# Acquiring resource B
# Using A and B
# Releasing resource B
# Releasing resource A

8. Пример с asyncpg (PostgreSQL)

import asyncpg
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_database_pool(dsn: str):
    """Создание пула подключений"""
    pool = await asyncpg.create_pool(dsn)
    try:
        yield pool
    finally:
        await pool.close()

@asynccontextmanager
async def get_connection(pool):
    """Получение соединения из пула"""
    async with pool.acquire() as connection:
        yield connection

async def main():
    async with get_database_pool('postgresql://user:pass@localhost/db') as pool:
        async with get_connection(pool) as conn:
            result = await conn.fetch('SELECT 1')
            print(result)

# asyncio.run(main())

9. Контекстный менеджер с timeout

@asynccontextmanager
async def async_operation_with_timeout(timeout_seconds: float):
    """Выполнить операцию с таймаутом"""
    try:
        async with asyncio.timeout(timeout_seconds):  # Python 3.11+
            yield
    except asyncio.TimeoutError:
        print(f"Operation timed out after {timeout_seconds}s")
        raise

async def slow_operation():
    await asyncio.sleep(5)

async def main():
    try:
        async with async_operation_with_timeout(2):
            await slow_operation()
    except asyncio.TimeoutError:
        print("Caught timeout")

# asyncio.run(main())

10. Класс с методом aclose() для явного закрытия

class AsyncResource:
    """Асинхронный ресурс с явным закрытием"""
    
    async def __aenter__(self):
        print("Resource acquired")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.aclose()
    
    async def aclose(self):
        """Явное закрытие ресурса"""
        print("Resource released")

# Использование
async def main():
    resource = AsyncResource()
    async with resource:
        print("Using resource")
    
    # Или явно
    resource2 = AsyncResource()
    async with resource2:
        pass

asyncio.run(main())

Резюме

Асинхронный контекстный менеджер:

# Способ 1: Класс
class AsyncContextManager:
    async def __aenter__(self):
        # Setup
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Cleanup
        return False

# Способ 2: Декоратор (проще)
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_context():
    # Setup
    try:
        yield resource
    finally:
        # Cleanup
        pass

# Использование
async with async_context() as res:
    # работа с ресурсом
    pass

Когда использовать:

  • Управление соединениями БД
  • Файловые операции
  • Блокировки и синхронизация
  • Тайм-ауты
  • Пулы ресурсов
  • Любые ресурсы требующие очистки
Как создать асинхронный контекстный менеджер? | PrepBro