← Назад к вопросам

Для чего нужны контекстные переменные в asyncio?

2.7 Senior🔥 71 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Контекстные переменные в asyncio: назначение и применение

Контекстные переменные в asyncio — это механизм для хранения данных, которые безопасны для асинхронного выполнения и автоматически распространяются между сопутствующими задачами. Они решают проблему thread-local хранилищ в асинхронном коде.

1. Основная проблема, которую решают контекстные переменные

В обычном синхронном коде можно использовать threading.local():

import threading

user_context = threading.local()

def process_request(user_id):
    user_context.user_id = user_id  # Привязано к потоку
    do_something()

# Но в asyncio это не работает!
# Разные корутины выполняются в одном потоке
async def async_process(user_id):
    user_context.user_id = user_id  # ❌ Одна переменная на всех!
    await some_async_op()

Все асинхронные функции в одном потоке конфликтуют за одну переменную. Вот где помогают контекстные переменные:

import asyncio
from contextvars import ContextVar

# Правильный способ
user_context: ContextVar[int] = ContextVar('user_context')

async def async_process(user_id):
    token = user_context.set(user_id)  # ✅ Каждой корутине свой контекст
    try:
        await some_async_op()
    finally:
        user_context.reset(token)

2. Синтаксис и базовое использование

from contextvars import ContextVar
import asyncio

# Создание контекстной переменной
request_id: ContextVar[str] = ContextVar('request_id')
user_id: ContextVar[int] = ContextVar('user_id')
current_user: ContextVar[dict] = ContextVar('current_user')

async def handler():
    # Получить текущее значение
    rid = request_id.get(None)  # None - значение по умолчанию
    
    # Установить значение
    token = request_id.set('req-12345')
    
    try:
        await process_request()
    finally:
        # Восстановить предыдущее значение
        request_id.reset(token)

async def process_request():
    rid = request_id.get()  # 'req-12345'
    print(f"Processing {rid}")

3. Использование в логировании и трассировке

Одно из самых практичных применений — добавление request ID к логам:

import logging
from contextvars import ContextVar

request_id = ContextVar('request_id', default='no-id')

class ContextualFilter(logging.Filter):
    def filter(self, record):
        record.request_id = request_id.get()
        return True

# Настройка логирования
logger = logging.getLogger(__name__)
logger.addFilter(ContextualFilter())

formatter = logging.Formatter(
    '%(asctime)s [%(request_id)s] %(levelname)s %(message)s'
)

handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)

# Использование
async def handle_request(request_id_value):
    token = request_id.set(request_id_value)
    try:
        logger.info("Request started")  # [req-123] Request started
        await process()
        logger.info("Request completed")  # [req-123] Request completed
    finally:
        request_id.reset(token)

4. Хранение информации о пользователе

Для FastAPI и других веб-фреймворков:

from contextvars import ContextVar
from typing import Optional
from pydantic import BaseModel

class UserInfo(BaseModel):
    user_id: int
    username: str
    role: str

current_user: ContextVar[Optional[UserInfo]] = ContextVar(
    'current_user',
    default=None
)

async def get_current_user():
    user = current_user.get()
    if user is None:
        raise HTTPException(status_code=401, detail="Unauthorized")
    return user

# В middleware
class AuthMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        user_data = await authenticate(request)
        
        if user_data:
            token = current_user.set(UserInfo(**user_data))
        
        try:
            response = await call_next(request)
        finally:
            if user_data:
                current_user.reset(token)
        
        return response

# В handlers — автоматически доступно
async def get_profile():
    user = current_user.get()
    return {"username": user.username, "role": user.role}

5. Копирование контекста между задачами

Критически важно для параллельных операций:

import asyncio
from contextvars import copy_context

user_id = ContextVar('user_id')

async def child_task():
    # Наследует контекст родителя
    uid = user_id.get()
    print(f"Child: {uid}")
    await asyncio.sleep(1)

async def parent_task():
    token = user_id.set(42)
    try:
        # ✅ Создаёт задачу с копией текущего контекста
        task = asyncio.create_task(child_task())
        await task  # Child: 42
    finally:
        user_id.reset(token)

async def main():
    await parent_task()

asyncio.run(main())

6. Работа с базой данных и транзакциями

Для отслеживания соединений и транзакций:

from contextvars import ContextVar
from sqlalchemy.ext.asyncio import AsyncSession

db_session: ContextVar[Optional[AsyncSession]] = ContextVar(
    'db_session',
    default=None
)

async def get_db_session():
    session = db_session.get()
    if session is None:
        raise RuntimeError("No database session in context")
    return session

async def handle_request(session: AsyncSession):
    token = db_session.set(session)
    try:
        # Глубоко вложенные функции могут получить сессию
        user = await get_user_by_id(user_id=1)
        profile = await get_user_profile(user)
        return {"user": user, "profile": profile}
    finally:
        db_session.reset(token)

async def get_user_by_id(user_id: int):
    session = await get_db_session()
    return await session.query(User).get(user_id)

async def get_user_profile(user):
    session = await get_db_session()
    return await session.query(Profile).filter_by(user_id=user.id).first()

7. Кеширование и синхронизация

Для отслеживания состояния операций:

from contextvars import ContextVar

operation_cache: ContextVar[dict] = ContextVar(
    'operation_cache',
    default=None
)

def get_cache():
    cache = operation_cache.get()
    if cache is None:
        cache = {}
        operation_cache.set(cache)
    return cache

async def process_with_cache():
    cache = get_cache()
    
    # Проверить кеш
    if 'result' in cache:
        return cache['result']
    
    # Вычислить
    result = await expensive_operation()
    cache['result'] = result
    
    return result

Лучшие практики:

  • Всегда используй try/finally для восстановления значений
  • Создавай переменные на уровне модуля, а не внутри функций
  • Используй copy_context() при запуске независимых задач
  • Избегай глубокой вложенности контекстов (сложно отследить)
  • Тестируй асинхронный код с разными значениями контекста

Контекстные переменные — это то, что делает асинхронный Python безопасным и предсказуемым при работе с данными, специфичными для запроса.