Какие знаешь проблемы с асинхронностью в SQLAlchemy?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Основные проблемы асинхронности в SQLAlchemy
СQLAlchemy имеет несколько критических проблем при работе с асинхронным кодом. Это один из самых сложных аспектов современной разработки на Python.
1. Синхронная библиотека в асинхронном мире
СQLAlchemy долгое время была полностью синхронной библиотекой. Вызов блокирует event loop, что уничтожает все преимущества async/await:
# ❌ Неправильно — блокирует event loop
async def get_user(user_id: int):
session = Session()
user = session.query(User).filter_by(id=user_id).first() # БЛОКИРУЕТ!
return user
Даже с async/await функция ждёт ответа БД синхронно, отчего остальные корутины не могут выполняться параллельно.
2. Асинхронный движок и сессии (SQLAlchemy 1.4+)
В версии 1.4+ добавлена поддержка асинхронного режима через create_async_engine:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Правильно
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
echo=False,
)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_user(user_id: int):
async with async_session() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalars().first()
Однако это требует использования select() вместо query().
3. Несовместимость с синхронными драйверами
Многие драйверы БД были синхронными и блокирующими:
# ❌ psycopg2 — синхронный
engine = create_engine("postgresql://...")
# ✅ asyncpg — асинхронный
engine = create_async_engine("postgresql+asyncpg://...")
# ✅ aiomysql — асинхронный
engine = create_async_engine("mysql+aiomysql://...")
Нужно выбирать асинхронные драйверы: asyncpg для PostgreSQL, aiomysql для MySQL.
4. Проблема с отложенными загрузками (Lazy Loading)
Отношения не загружаются автоматически в асинхронном коде:
# ❌ Ошибка! Lazy loading не работает в async
async def get_user_posts(user_id: int):
async with async_session() as session:
user = await session.get(User, user_id)
return user.posts # DetachedInstanceError!
Решения:
# Вариант 1: selectinload (eager loading)
from sqlalchemy.orm import selectinload
result = await session.execute(
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
user = result.scalars().first()
print(user.posts) # ✅ Работает
# Вариант 2: joinedload
from sqlalchemy.orm import joinedload
result = await session.execute(
select(User)
.where(User.id == user_id)
.options(joinedload(User.posts))
)
# Вариант 3: Явная загрузка
await session.refresh(user, ["posts"])
5. Обработка исключений и транзакции
Транзакции в async требуют дополнительного внимания:
# ❌ Неправильно
async def create_user(name: str):
async with async_session() as session:
user = User(name=name)
session.add(user)
await session.commit() # Может быть отменено!
return user # Связь разорвана
# ✅ Правильно
async def create_user(name: str):
async with async_session() as session:
async with session.begin():
user = User(name=name)
session.add(user)
await session.flush() # Flush перед возвратом
# Commit происходит при выходе из блока
return user
6. Race conditions с SELECT FOR UPDATE
В асинхронном коде легко создать условия гонки:
# ❌ Проблема: два запроса одновременно могут прочитать старое значение
async def transfer_money(from_id, to_id, amount):
async with async_session() as session:
from_account = await session.get(Account, from_id)
to_account = await session.get(Account, to_id)
# Между этими двумя запросами другой процесс может изменить данные!
if from_account.balance >= amount:
from_account.balance -= amount
to_account.balance += amount
# ✅ Решение: SELECT FOR UPDATE
from sqlalchemy import select, text
async def transfer_money(from_id, to_id, amount):
async with async_session() as session:
async with session.begin():
result = await session.execute(
select(Account)
.where(Account.id == from_id)
.with_for_update()
)
from_account = result.scalars().first()
if from_account.balance >= amount:
from_account.balance -= amount
# Другой аккаунт заблокирован, пока мы обновляем
result = await session.execute(
select(Account)
.where(Account.id == to_id)
.with_for_update()
)
to_account = result.scalars().first()
to_account.balance += amount
7. Утечки соединений
Неправильное управление сессиями приводит к утечкам:
# ❌ Утечка соединения
async def bad_query():
session = async_session()
user = await session.execute(select(User))
# Сессия не закрыта!
# ✅ Правильно
async def good_query():
async with async_session() as session:
result = await session.execute(select(User))
return result.scalars().all()
Итоги
Асинхронность в SQLAlchemy требует:
- Использования
create_async_engineс асинхронными драйверами - Явной загрузки отношений (selectinload, joinedload)
- Правильного управления сессиями через контекстные менеджеры
- Защиты от race conditions с
with_for_update() - Правильной обработки транзакций
Это сложная тема, которая часто упускается разработчиками.