← Назад к вопросам
Как создать асинхронный контекстный менеджер?
2.0 Middle🔥 191 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Создание асинхронного контекстного менеджера в Python
Асинхронные контекстные менеджеры позволяют работать с ресурсами (соединения БД, файлы, блокировки) в асинхронном коде с гарантией корректной очистки.
1. Основы: async with и методы
Асинхронный контекстный менеджер реализует два метода:
__aenter__()— вход в контекст (выполняется с await)__aexit__()— выход из контекста (выполняется с await)
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""Асинхронный контекстный менеджер для БД"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.is_connected = False
async def __aenter__(self):
"""Вход в контекст (открытие соединения)"""
print(f"Connecting to {self.connection_string}...")
await asyncio.sleep(0.5) # Симуляция подключения
self.is_connected = True
print("Connected!")
return self # Возвращаем объект для использования в блоке with
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Выход из контекста (закрытие соединения)"""
print("Closing connection...")
await asyncio.sleep(0.2) # Симуляция закрытия
self.is_connected = False
print("Connection closed!")
return False # Пробросить исключение, если оно было
async def execute_query(self, query: str):
"""Выполнить запрос"""
if not self.is_connected:
raise RuntimeError("Not connected")
print(f"Executing: {query}")
await asyncio.sleep(0.3) # Симуляция запроса
return [{"id": 1, "name": "Alice"}]
# Использование
async def main():
async with AsyncDatabaseConnection("postgres://localhost") as db:
result = await db.execute_query("SELECT * FROM users")
print(f"Result: {result}")
# Соединение автоматически закрыто здесь
asyncio.run(main())
# Вывод:
# Connecting to postgres://localhost...
# Connected!
# Executing: SELECT * FROM users
# Result: [{'id': 1, 'name': 'Alice'}]
# Closing connection...
# Connection closed!
2. Параметры __aexit__
Метод __aexit__ получает информацию об исключении:
class AsyncContextManager:
async def __aenter__(self):
print("Entering context")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# exc_type: тип исключения (None если ошибки не было)
# exc_val: значение исключения
# exc_tb: traceback
if exc_type is not None:
print(f"Exception occurred: {exc_type.__name__}: {exc_val}")
return False # Пробросить исключение
else:
print("Exiting normally")
return True # Подавить исключение (если было)
# Тест с исключением
async def test_with_exception():
try:
async with AsyncContextManager() as ctx:
raise ValueError("Something went wrong!")
except ValueError:
print("Exception was raised")
asyncio.run(test_with_exception())
3. Использование contextlib.asynccontextmanager
Для простых случаев лучше использовать декоратор:
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_database_connection(connection_string: str):
"""Контекстный менеджер как генератор"""
print(f"Connecting to {connection_string}...")
await asyncio.sleep(0.5)
print("Connected!")
try:
yield # Сюда передаём объект ресурса (можно опустить)
finally:
print("Closing connection...")
await asyncio.sleep(0.2)
print("Connection closed!")
# Использование
async def main():
async with async_database_connection("postgres://localhost") as _:
print("Inside context")
asyncio.run(main())
С возвращаемым значением:
@asynccontextmanager
async def async_database(connection_string: str):
print(f"Connecting to {connection_string}...")
class DB:
async def query(self, sql):
return [{"result": 1}]
db = DB()
try:
yield db # Возвращаем объект БД
finally:
print("Cleaning up...")
async def main():
async with async_database("postgres://localhost") as db:
result = await db.query("SELECT 1")
print(f"Result: {result}")
asyncio.run(main())
4. Обработка исключений
@asynccontextmanager
async def async_resource_safe():
print("Setting up resource")
try:
yield "resource_value"
except ValueError as e:
print(f"Caught ValueError: {e}")
# Подавляем исключение (не пробросим дальше)
# raise # раскомментируй для проброса
except Exception as e:
print(f"Caught other exception: {e}")
raise # Пробросим это исключение
finally:
print("Cleaning up resource")
async def main():
try:
async with async_resource_safe() as res:
print(f"Using: {res}")
raise ValueError("Test error")
except ValueError:
print("Outside context: ValueError was handled")
asyncio.run(main())
5. Практический пример: асинхронная блокировка
import asyncio
from contextlib import asynccontextmanager
class AsyncLock:
"""Простая асинхронная блокировка"""
def __init__(self):
self._lock = asyncio.Lock()
async def __aenter__(self):
await self._lock.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
# Использование
shared_resource = AsyncLock()
async def worker(name):
async with shared_resource:
print(f"{name}: Acquired lock")
await asyncio.sleep(1)
print(f"{name}: Releasing lock")
async def main():
# Задачи будут исполняться последовательно благодаря lock
await asyncio.gather(
worker("Worker 1"),
worker("Worker 2"),
worker("Worker 3")
)
asyncio.run(main())
6. Асинхронный менеджер для файловых операций
from contextlib import asynccontextmanager
import aiofiles
@asynccontextmanager
async def async_file_reader(filename: str):
"""Асинхронное чтение файла"""
file = await aiofiles.open(filename, 'r')
try:
yield file
finally:
await file.close()
# Использование
async def main():
async with async_file_reader('data.txt') as file:
content = await file.read()
print(content)
# Или используй встроенный async with:
async def main2():
async with aiofiles.open('data.txt', 'r') as file:
content = await file.read()
print(content)
7. Вложенные асинхронные контексты
@asynccontextmanager
async def resource_a():
print("Acquiring resource A")
yield "A"
print("Releasing resource A")
@asynccontextmanager
async def resource_b():
print("Acquiring resource B")
yield "B"
print("Releasing resource B")
async def main():
# Несколько контекстов
async with resource_a() as a, resource_b() as b:
print(f"Using {a} and {b}")
# Или вложенные
async with resource_a() as a:
async with resource_b() as b:
print(f"Nested: {a}, {b}")
asyncio.run(main())
# Вывод:
# Acquiring resource A
# Acquiring resource B
# Using A and B
# Releasing resource B
# Releasing resource A
8. Пример с asyncpg (PostgreSQL)
import asyncpg
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_database_pool(dsn: str):
"""Создание пула подключений"""
pool = await asyncpg.create_pool(dsn)
try:
yield pool
finally:
await pool.close()
@asynccontextmanager
async def get_connection(pool):
"""Получение соединения из пула"""
async with pool.acquire() as connection:
yield connection
async def main():
async with get_database_pool('postgresql://user:pass@localhost/db') as pool:
async with get_connection(pool) as conn:
result = await conn.fetch('SELECT 1')
print(result)
# asyncio.run(main())
9. Контекстный менеджер с timeout
@asynccontextmanager
async def async_operation_with_timeout(timeout_seconds: float):
"""Выполнить операцию с таймаутом"""
try:
async with asyncio.timeout(timeout_seconds): # Python 3.11+
yield
except asyncio.TimeoutError:
print(f"Operation timed out after {timeout_seconds}s")
raise
async def slow_operation():
await asyncio.sleep(5)
async def main():
try:
async with async_operation_with_timeout(2):
await slow_operation()
except asyncio.TimeoutError:
print("Caught timeout")
# asyncio.run(main())
10. Класс с методом aclose() для явного закрытия
class AsyncResource:
"""Асинхронный ресурс с явным закрытием"""
async def __aenter__(self):
print("Resource acquired")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
async def aclose(self):
"""Явное закрытие ресурса"""
print("Resource released")
# Использование
async def main():
resource = AsyncResource()
async with resource:
print("Using resource")
# Или явно
resource2 = AsyncResource()
async with resource2:
pass
asyncio.run(main())
Резюме
Асинхронный контекстный менеджер:
# Способ 1: Класс
class AsyncContextManager:
async def __aenter__(self):
# Setup
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Cleanup
return False
# Способ 2: Декоратор (проще)
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_context():
# Setup
try:
yield resource
finally:
# Cleanup
pass
# Использование
async with async_context() as res:
# работа с ресурсом
pass
Когда использовать:
- Управление соединениями БД
- Файловые операции
- Блокировки и синхронизация
- Тайм-ауты
- Пулы ресурсов
- Любые ресурсы требующие очистки