← Назад к вопросам

Что такое UnitOfWork?

1.7 Middle🔥 141 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Unit of Work (UnitOfWork)

Unit of Work — это архитектурный паттерн, который инкапсулирует логику отслеживания изменений объектов и управления их персистентностью в БД. Паттерн обеспечивает, что все операции над объектами выполняются как одна атомарная транзакция: либо все изменения сохраняются в БД, либо ничего не сохраняется (ACID-гарантии).

Проблема, которую решает Unit of Work

В приложении без Unit of Work возникают проблемы:

# ❌ Без Unit of Work - хаос
def transfer_money(user_id, to_user_id, amount):
    user = db.session.query(User).get(user_id)
    to_user = db.session.query(User).get(to_user_id)
    
    user.balance -= amount
    to_user.balance += amount
    
    db.session.commit()  # Что если произойдёт ошибка между операциями?
    # Если перевод не удался, данные в несогласованном состоянии

Проблемы:

  • Множественные commit'ы разбросаны по коду
  • Нет единой точки контроля
  • Сложно откатить все изменения при ошибке
  • Трудно отследить, какие объекты были изменены

Концепция Unit of Work

Unit of Work управляет:

  1. Отслеживанием изменений (tracking) — какие объекты были добавлены, изменены, удалены
  2. Буферизацией операций — сохранение всех операций в памяти
  3. Исполнением — отправка всех операций в БД как одна транзакция
  4. Откатом — откат всех изменений при ошибке

Реализация Unit of Work с SQLAlchemy

Вариант 1: Базовая реализация с контекстным менеджером

from typing import List, Any
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class UnitOfWork:
    """Инкапсулирует работу с транзакциями"""
    def __init__(self, session: Session):
        self.session = session
        self._new_objects: List[Any] = []
        self._dirty_objects: List[Any] = []
        self._removed_objects: List[Any] = []
    
    def register_new(self, obj):
        """Зарегистрировать новый объект"""
        self._new_objects.append(obj)
    
    def register_dirty(self, obj):
        """Зарегистрировать изменённый объект"""
        if obj not in self._dirty_objects:
            self._dirty_objects.append(obj)
    
    def register_removed(self, obj):
        """Зарегистрировать удаляемый объект"""
        self._removed_objects.append(obj)
    
    def commit(self):
        """Сохранить все изменения в БД"""
        try:
            # Добавить новые объекты
            for obj in self._new_objects:
                self.session.add(obj)
            
            # Если объекты добавлены через session.add(),
            # они уже в session.new
            # Грязные объекты автоматически отслеживаются
            
            # Удалить объекты
            for obj in self._removed_objects:
                self.session.delete(obj)
            
            # Коммитим всю транзакцию
            self.session.commit()
            self._clear()
        except Exception as e:
            self.session.rollback()
            self._clear()
            raise
    
    def rollback(self):
        """Откатить все изменения"""
        self.session.rollback()
        self._clear()
    
    def _clear(self):
        """Очистить буферы"""
        self._new_objects.clear()
        self._dirty_objects.clear()
        self._removed_objects.clear()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.commit()
        else:
            self.rollback()

Использование

engine = create_engine('postgresql://user:password@localhost/db')
SessionLocal = sessionmaker(bind=engine)

# Пример 1: Успешная транзакция
def transfer_money(user_id: int, to_user_id: int, amount: float):
    session = SessionLocal()
    uow = UnitOfWork(session)
    
    with uow:
        user = session.query(User).get(user_id)
        to_user = session.query(User).get(to_user_id)
        
        if user.balance < amount:
            raise ValueError("Insufficient funds")
        
        user.balance -= amount
        to_user.balance += amount
        
        uow.register_dirty(user)
        uow.register_dirty(to_user)
        # При выходе из with: автоматический commit

# Пример 2: Откат при ошибке
def process_payment(order_id: int):
    session = SessionLocal()
    uow = UnitOfWork(session)
    
    try:
        with uow:
            order = session.query(Order).get(order_id)
            order.status = "processing"
            uow.register_dirty(order)
            
            # Вызов внешнего API может привести к ошибке
            payment_result = process_payment_api(order)
            
            if not payment_result.success:
                raise Exception("Payment failed")
            
            order.status = "paid"
            # Если здесь возникнет ошибка, всё откатится
    except Exception as e:
        print(f"Transaction rolled back: {e}")
        raise

Вариант 2: Unit of Work с репозиториями

from abc import ABC, abstractmethod
from typing import Generic, TypeVar

T = TypeVar('T')

class Repository(ABC, Generic[T]):
    """Интерфейс репозитория"""
    @abstractmethod
    def add(self, entity: T) -> None:
        pass
    
    @abstractmethod
    def remove(self, entity: T) -> None:
        pass
    
    @abstractmethod
    def get_by_id(self, entity_id: int) -> T:
        pass

class UserRepository(Repository[User]):
    def __init__(self, session: Session):
        self.session = session
    
    def add(self, user: User) -> None:
        self.session.add(user)
    
    def remove(self, user: User) -> None:
        self.session.delete(user)
    
    def get_by_id(self, user_id: int) -> User:
        return self.session.query(User).get(user_id)

class UnitOfWork:
    """Unit of Work со множеством репозиториев"""
    def __init__(self, session: Session):
        self.session = session
        self.users = UserRepository(session)
        self.orders = OrderRepository(session)
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.commit()
        else:
            self.rollback()
    
    def commit(self):
        try:
            self.session.commit()
        except Exception:
            self.session.rollback()
            raise
    
    def rollback(self):
        self.session.rollback()

# Использование
def create_user_with_order(user_data: dict, order_data: dict):
    session = SessionLocal()
    
    with UnitOfWork(session) as uow:
        # Создаём пользователя
        user = User(**user_data)
        uow.users.add(user)
        
        # Создаём заказ
        order = Order(**order_data, user_id=user.id)
        uow.orders.add(order)
        
        # При выходе из with: автоматический commit
        # Если произойдёт ошибка, всё откатится

Вариант 3: Unit of Work с аудитом изменений

from datetime import datetime

class AuditedUnitOfWork(UnitOfWork):
    """Unit of Work с отслеживанием всех изменений"""
    def __init__(self, session: Session, user_id: int):
        super().__init__(session)
        self.user_id = user_id
        self.audit_logs: List[AuditLog] = []
    
    def register_dirty(self, obj):
        super().register_dirty(obj)
        
        # Логируем изменение
        audit_log = AuditLog(
            user_id=self.user_id,
            entity_type=type(obj).__name__,
            entity_id=obj.id,
            action="UPDATE",
            timestamp=datetime.now(),
            old_values=self._get_old_values(obj),
            new_values=self._get_new_values(obj)
        )
        self.audit_logs.append(audit_log)
    
    def commit(self):
        # Добавляем логи аудита
        for log in self.audit_logs:
            self.session.add(log)
        
        super().commit()
    
    def _get_old_values(self, obj):
        # Получить старые значения из session.history
        return dict()
    
    def _get_new_values(self, obj):
        # Получить новые значения
        return dict(obj.__dict__)

Паттерн Unit of Work в FastAPI

from fastapi import FastAPI, Depends

app = FastAPI()

def get_uow() -> UnitOfWork:
    session = SessionLocal()
    try:
        yield UnitOfWork(session)
    finally:
        session.close()

@app.post("/transfer")
def transfer_funds(
    from_user_id: int,
    to_user_id: int,
    amount: float,
    uow: UnitOfWork = Depends(get_uow)
):
    """Перевод денежных средств"""
    with uow:
        from_user = uow.users.get_by_id(from_user_id)
        to_user = uow.users.get_by_id(to_user_id)
        
        if from_user.balance < amount:
            raise ValueError("Insufficient funds")
        
        from_user.balance -= amount
        to_user.balance += amount
        
        uow.users.add(from_user)
        uow.users.add(to_user)
        # Автоматический commit при успехе
    
    return {"status": "success"}

Преимущества Unit of Work

  1. ACID-гарантии — все операции либо выполняются, либо откатываются
  2. Единая точка контроля — все БД-операции в одном месте
  3. Переиспользуемость — одна UoW логика для разных use case'ов
  4. Тестируемость — можно мокировать UoW в тестах
  5. Откат при ошибке — автоматический rollback

Когда использовать Unit of Work

# ОБЯЗАТЕЛЬНО использовать Unit of Work
- Транзакции, затрагивающие несколько сущностей
- Операции, где важна целостность данных
- Интеграция с внешними API
- Сложная бизнес-логика с множественными шагами

Unit of Work — мощный паттерн для управления сложными транзакциями в приложениях. Он обеспечивает надёжность, читаемость кода и централизованное управление состоянием базы данных.