← Назад к вопросам

Какие знаешь проблемы с асинхронностью в SQLAlchemy?

2.0 Middle🔥 131 комментариев
#Базы данных (SQL)#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Основные проблемы асинхронности в SQLAlchemy

СQLAlchemy имеет несколько критических проблем при работе с асинхронным кодом. Это один из самых сложных аспектов современной разработки на Python.

1. Синхронная библиотека в асинхронном мире

СQLAlchemy долгое время была полностью синхронной библиотекой. Вызов блокирует event loop, что уничтожает все преимущества async/await:

# ❌ Неправильно — блокирует event loop
async def get_user(user_id: int):
    session = Session()
    user = session.query(User).filter_by(id=user_id).first()  # БЛОКИРУЕТ!
    return user

Даже с async/await функция ждёт ответа БД синхронно, отчего остальные корутины не могут выполняться параллельно.

2. Асинхронный движок и сессии (SQLAlchemy 1.4+)

В версии 1.4+ добавлена поддержка асинхронного режима через create_async_engine:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# Правильно
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    echo=False,
)

async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_user(user_id: int):
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalars().first()

Однако это требует использования select() вместо query().

3. Несовместимость с синхронными драйверами

Многие драйверы БД были синхронными и блокирующими:

# ❌ psycopg2 — синхронный
engine = create_engine("postgresql://...")

# ✅ asyncpg — асинхронный
engine = create_async_engine("postgresql+asyncpg://...")

# ✅ aiomysql — асинхронный
engine = create_async_engine("mysql+aiomysql://...")

Нужно выбирать асинхронные драйверы: asyncpg для PostgreSQL, aiomysql для MySQL.

4. Проблема с отложенными загрузками (Lazy Loading)

Отношения не загружаются автоматически в асинхронном коде:

# ❌ Ошибка! Lazy loading не работает в async
async def get_user_posts(user_id: int):
    async with async_session() as session:
        user = await session.get(User, user_id)
        return user.posts  # DetachedInstanceError!

Решения:

# Вариант 1: selectinload (eager loading)
from sqlalchemy.orm import selectinload

result = await session.execute(
    select(User)
    .where(User.id == user_id)
    .options(selectinload(User.posts))
)
user = result.scalars().first()
print(user.posts)  # ✅ Работает

# Вариант 2: joinedload
from sqlalchemy.orm import joinedload

result = await session.execute(
    select(User)
    .where(User.id == user_id)
    .options(joinedload(User.posts))
)

# Вариант 3: Явная загрузка
await session.refresh(user, ["posts"])

5. Обработка исключений и транзакции

Транзакции в async требуют дополнительного внимания:

# ❌ Неправильно
async def create_user(name: str):
    async with async_session() as session:
        user = User(name=name)
        session.add(user)
        await session.commit()  # Может быть отменено!
        return user  # Связь разорвана

# ✅ Правильно
async def create_user(name: str):
    async with async_session() as session:
        async with session.begin():
            user = User(name=name)
            session.add(user)
            await session.flush()  # Flush перед возвратом
        # Commit происходит при выходе из блока
        return user

6. Race conditions с SELECT FOR UPDATE

В асинхронном коде легко создать условия гонки:

# ❌ Проблема: два запроса одновременно могут прочитать старое значение
async def transfer_money(from_id, to_id, amount):
    async with async_session() as session:
        from_account = await session.get(Account, from_id)
        to_account = await session.get(Account, to_id)
        
        # Между этими двумя запросами другой процесс может изменить данные!
        if from_account.balance >= amount:
            from_account.balance -= amount
            to_account.balance += amount

# ✅ Решение: SELECT FOR UPDATE
from sqlalchemy import select, text

async def transfer_money(from_id, to_id, amount):
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(
                select(Account)
                .where(Account.id == from_id)
                .with_for_update()
            )
            from_account = result.scalars().first()
            
            if from_account.balance >= amount:
                from_account.balance -= amount
                # Другой аккаунт заблокирован, пока мы обновляем
                result = await session.execute(
                    select(Account)
                    .where(Account.id == to_id)
                    .with_for_update()
                )
                to_account = result.scalars().first()
                to_account.balance += amount

7. Утечки соединений

Неправильное управление сессиями приводит к утечкам:

# ❌ Утечка соединения
async def bad_query():
    session = async_session()
    user = await session.execute(select(User))
    # Сессия не закрыта!

# ✅ Правильно
async def good_query():
    async with async_session() as session:
        result = await session.execute(select(User))
        return result.scalars().all()

Итоги

Асинхронность в SQLAlchemy требует:

  • Использования create_async_engine с асинхронными драйверами
  • Явной загрузки отношений (selectinload, joinedload)
  • Правильного управления сессиями через контекстные менеджеры
  • Защиты от race conditions с with_for_update()
  • Правильной обработки транзакций

Это сложная тема, которая часто упускается разработчиками.