Для чего нужны контекстные переменные в asyncio?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Контекстные переменные в asyncio: назначение и применение
Контекстные переменные в asyncio — это механизм для хранения данных, которые безопасны для асинхронного выполнения и автоматически распространяются между сопутствующими задачами. Они решают проблему thread-local хранилищ в асинхронном коде.
1. Основная проблема, которую решают контекстные переменные
В обычном синхронном коде можно использовать threading.local():
import threading
user_context = threading.local()
def process_request(user_id):
user_context.user_id = user_id # Привязано к потоку
do_something()
# Но в asyncio это не работает!
# Разные корутины выполняются в одном потоке
async def async_process(user_id):
user_context.user_id = user_id # ❌ Одна переменная на всех!
await some_async_op()
Все асинхронные функции в одном потоке конфликтуют за одну переменную. Вот где помогают контекстные переменные:
import asyncio
from contextvars import ContextVar
# Правильный способ
user_context: ContextVar[int] = ContextVar('user_context')
async def async_process(user_id):
token = user_context.set(user_id) # ✅ Каждой корутине свой контекст
try:
await some_async_op()
finally:
user_context.reset(token)
2. Синтаксис и базовое использование
from contextvars import ContextVar
import asyncio
# Создание контекстной переменной
request_id: ContextVar[str] = ContextVar('request_id')
user_id: ContextVar[int] = ContextVar('user_id')
current_user: ContextVar[dict] = ContextVar('current_user')
async def handler():
# Получить текущее значение
rid = request_id.get(None) # None - значение по умолчанию
# Установить значение
token = request_id.set('req-12345')
try:
await process_request()
finally:
# Восстановить предыдущее значение
request_id.reset(token)
async def process_request():
rid = request_id.get() # 'req-12345'
print(f"Processing {rid}")
3. Использование в логировании и трассировке
Одно из самых практичных применений — добавление request ID к логам:
import logging
from contextvars import ContextVar
request_id = ContextVar('request_id', default='no-id')
class ContextualFilter(logging.Filter):
def filter(self, record):
record.request_id = request_id.get()
return True
# Настройка логирования
logger = logging.getLogger(__name__)
logger.addFilter(ContextualFilter())
formatter = logging.Formatter(
'%(asctime)s [%(request_id)s] %(levelname)s %(message)s'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Использование
async def handle_request(request_id_value):
token = request_id.set(request_id_value)
try:
logger.info("Request started") # [req-123] Request started
await process()
logger.info("Request completed") # [req-123] Request completed
finally:
request_id.reset(token)
4. Хранение информации о пользователе
Для FastAPI и других веб-фреймворков:
from contextvars import ContextVar
from typing import Optional
from pydantic import BaseModel
class UserInfo(BaseModel):
user_id: int
username: str
role: str
current_user: ContextVar[Optional[UserInfo]] = ContextVar(
'current_user',
default=None
)
async def get_current_user():
user = current_user.get()
if user is None:
raise HTTPException(status_code=401, detail="Unauthorized")
return user
# В middleware
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
user_data = await authenticate(request)
if user_data:
token = current_user.set(UserInfo(**user_data))
try:
response = await call_next(request)
finally:
if user_data:
current_user.reset(token)
return response
# В handlers — автоматически доступно
async def get_profile():
user = current_user.get()
return {"username": user.username, "role": user.role}
5. Копирование контекста между задачами
Критически важно для параллельных операций:
import asyncio
from contextvars import copy_context
user_id = ContextVar('user_id')
async def child_task():
# Наследует контекст родителя
uid = user_id.get()
print(f"Child: {uid}")
await asyncio.sleep(1)
async def parent_task():
token = user_id.set(42)
try:
# ✅ Создаёт задачу с копией текущего контекста
task = asyncio.create_task(child_task())
await task # Child: 42
finally:
user_id.reset(token)
async def main():
await parent_task()
asyncio.run(main())
6. Работа с базой данных и транзакциями
Для отслеживания соединений и транзакций:
from contextvars import ContextVar
from sqlalchemy.ext.asyncio import AsyncSession
db_session: ContextVar[Optional[AsyncSession]] = ContextVar(
'db_session',
default=None
)
async def get_db_session():
session = db_session.get()
if session is None:
raise RuntimeError("No database session in context")
return session
async def handle_request(session: AsyncSession):
token = db_session.set(session)
try:
# Глубоко вложенные функции могут получить сессию
user = await get_user_by_id(user_id=1)
profile = await get_user_profile(user)
return {"user": user, "profile": profile}
finally:
db_session.reset(token)
async def get_user_by_id(user_id: int):
session = await get_db_session()
return await session.query(User).get(user_id)
async def get_user_profile(user):
session = await get_db_session()
return await session.query(Profile).filter_by(user_id=user.id).first()
7. Кеширование и синхронизация
Для отслеживания состояния операций:
from contextvars import ContextVar
operation_cache: ContextVar[dict] = ContextVar(
'operation_cache',
default=None
)
def get_cache():
cache = operation_cache.get()
if cache is None:
cache = {}
operation_cache.set(cache)
return cache
async def process_with_cache():
cache = get_cache()
# Проверить кеш
if 'result' in cache:
return cache['result']
# Вычислить
result = await expensive_operation()
cache['result'] = result
return result
Лучшие практики:
- Всегда используй
try/finallyдля восстановления значений - Создавай переменные на уровне модуля, а не внутри функций
- Используй
copy_context()при запуске независимых задач - Избегай глубокой вложенности контекстов (сложно отследить)
- Тестируй асинхронный код с разными значениями контекста
Контекстные переменные — это то, что делает асинхронный Python безопасным и предсказуемым при работе с данными, специфичными для запроса.