Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Unit of Work (UnitOfWork)
Unit of Work — это архитектурный паттерн, который инкапсулирует логику отслеживания изменений объектов и управления их персистентностью в БД. Паттерн обеспечивает, что все операции над объектами выполняются как одна атомарная транзакция: либо все изменения сохраняются в БД, либо ничего не сохраняется (ACID-гарантии).
Проблема, которую решает Unit of Work
В приложении без Unit of Work возникают проблемы:
# ❌ Без Unit of Work - хаос
def transfer_money(user_id, to_user_id, amount):
user = db.session.query(User).get(user_id)
to_user = db.session.query(User).get(to_user_id)
user.balance -= amount
to_user.balance += amount
db.session.commit() # Что если произойдёт ошибка между операциями?
# Если перевод не удался, данные в несогласованном состоянии
Проблемы:
- Множественные commit'ы разбросаны по коду
- Нет единой точки контроля
- Сложно откатить все изменения при ошибке
- Трудно отследить, какие объекты были изменены
Концепция Unit of Work
Unit of Work управляет:
- Отслеживанием изменений (tracking) — какие объекты были добавлены, изменены, удалены
- Буферизацией операций — сохранение всех операций в памяти
- Исполнением — отправка всех операций в БД как одна транзакция
- Откатом — откат всех изменений при ошибке
Реализация Unit of Work с SQLAlchemy
Вариант 1: Базовая реализация с контекстным менеджером
from typing import List, Any
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class UnitOfWork:
"""Инкапсулирует работу с транзакциями"""
def __init__(self, session: Session):
self.session = session
self._new_objects: List[Any] = []
self._dirty_objects: List[Any] = []
self._removed_objects: List[Any] = []
def register_new(self, obj):
"""Зарегистрировать новый объект"""
self._new_objects.append(obj)
def register_dirty(self, obj):
"""Зарегистрировать изменённый объект"""
if obj not in self._dirty_objects:
self._dirty_objects.append(obj)
def register_removed(self, obj):
"""Зарегистрировать удаляемый объект"""
self._removed_objects.append(obj)
def commit(self):
"""Сохранить все изменения в БД"""
try:
# Добавить новые объекты
for obj in self._new_objects:
self.session.add(obj)
# Если объекты добавлены через session.add(),
# они уже в session.new
# Грязные объекты автоматически отслеживаются
# Удалить объекты
for obj in self._removed_objects:
self.session.delete(obj)
# Коммитим всю транзакцию
self.session.commit()
self._clear()
except Exception as e:
self.session.rollback()
self._clear()
raise
def rollback(self):
"""Откатить все изменения"""
self.session.rollback()
self._clear()
def _clear(self):
"""Очистить буферы"""
self._new_objects.clear()
self._dirty_objects.clear()
self._removed_objects.clear()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
else:
self.rollback()
Использование
engine = create_engine('postgresql://user:password@localhost/db')
SessionLocal = sessionmaker(bind=engine)
# Пример 1: Успешная транзакция
def transfer_money(user_id: int, to_user_id: int, amount: float):
session = SessionLocal()
uow = UnitOfWork(session)
with uow:
user = session.query(User).get(user_id)
to_user = session.query(User).get(to_user_id)
if user.balance < amount:
raise ValueError("Insufficient funds")
user.balance -= amount
to_user.balance += amount
uow.register_dirty(user)
uow.register_dirty(to_user)
# При выходе из with: автоматический commit
# Пример 2: Откат при ошибке
def process_payment(order_id: int):
session = SessionLocal()
uow = UnitOfWork(session)
try:
with uow:
order = session.query(Order).get(order_id)
order.status = "processing"
uow.register_dirty(order)
# Вызов внешнего API может привести к ошибке
payment_result = process_payment_api(order)
if not payment_result.success:
raise Exception("Payment failed")
order.status = "paid"
# Если здесь возникнет ошибка, всё откатится
except Exception as e:
print(f"Transaction rolled back: {e}")
raise
Вариант 2: Unit of Work с репозиториями
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
T = TypeVar('T')
class Repository(ABC, Generic[T]):
"""Интерфейс репозитория"""
@abstractmethod
def add(self, entity: T) -> None:
pass
@abstractmethod
def remove(self, entity: T) -> None:
pass
@abstractmethod
def get_by_id(self, entity_id: int) -> T:
pass
class UserRepository(Repository[User]):
def __init__(self, session: Session):
self.session = session
def add(self, user: User) -> None:
self.session.add(user)
def remove(self, user: User) -> None:
self.session.delete(user)
def get_by_id(self, user_id: int) -> User:
return self.session.query(User).get(user_id)
class UnitOfWork:
"""Unit of Work со множеством репозиториев"""
def __init__(self, session: Session):
self.session = session
self.users = UserRepository(session)
self.orders = OrderRepository(session)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
else:
self.rollback()
def commit(self):
try:
self.session.commit()
except Exception:
self.session.rollback()
raise
def rollback(self):
self.session.rollback()
# Использование
def create_user_with_order(user_data: dict, order_data: dict):
session = SessionLocal()
with UnitOfWork(session) as uow:
# Создаём пользователя
user = User(**user_data)
uow.users.add(user)
# Создаём заказ
order = Order(**order_data, user_id=user.id)
uow.orders.add(order)
# При выходе из with: автоматический commit
# Если произойдёт ошибка, всё откатится
Вариант 3: Unit of Work с аудитом изменений
from datetime import datetime
class AuditedUnitOfWork(UnitOfWork):
"""Unit of Work с отслеживанием всех изменений"""
def __init__(self, session: Session, user_id: int):
super().__init__(session)
self.user_id = user_id
self.audit_logs: List[AuditLog] = []
def register_dirty(self, obj):
super().register_dirty(obj)
# Логируем изменение
audit_log = AuditLog(
user_id=self.user_id,
entity_type=type(obj).__name__,
entity_id=obj.id,
action="UPDATE",
timestamp=datetime.now(),
old_values=self._get_old_values(obj),
new_values=self._get_new_values(obj)
)
self.audit_logs.append(audit_log)
def commit(self):
# Добавляем логи аудита
for log in self.audit_logs:
self.session.add(log)
super().commit()
def _get_old_values(self, obj):
# Получить старые значения из session.history
return dict()
def _get_new_values(self, obj):
# Получить новые значения
return dict(obj.__dict__)
Паттерн Unit of Work в FastAPI
from fastapi import FastAPI, Depends
app = FastAPI()
def get_uow() -> UnitOfWork:
session = SessionLocal()
try:
yield UnitOfWork(session)
finally:
session.close()
@app.post("/transfer")
def transfer_funds(
from_user_id: int,
to_user_id: int,
amount: float,
uow: UnitOfWork = Depends(get_uow)
):
"""Перевод денежных средств"""
with uow:
from_user = uow.users.get_by_id(from_user_id)
to_user = uow.users.get_by_id(to_user_id)
if from_user.balance < amount:
raise ValueError("Insufficient funds")
from_user.balance -= amount
to_user.balance += amount
uow.users.add(from_user)
uow.users.add(to_user)
# Автоматический commit при успехе
return {"status": "success"}
Преимущества Unit of Work
- ACID-гарантии — все операции либо выполняются, либо откатываются
- Единая точка контроля — все БД-операции в одном месте
- Переиспользуемость — одна UoW логика для разных use case'ов
- Тестируемость — можно мокировать UoW в тестах
- Откат при ошибке — автоматический rollback
Когда использовать Unit of Work
# ОБЯЗАТЕЛЬНО использовать Unit of Work
- Транзакции, затрагивающие несколько сущностей
- Операции, где важна целостность данных
- Интеграция с внешними API
- Сложная бизнес-логика с множественными шагами
Unit of Work — мощный паттерн для управления сложными транзакциями в приложениях. Он обеспечивает надёжность, читаемость кода и централизованное управление состоянием базы данных.