← Назад к вопросам

Как пользуешься понятием транзакция в контексте операций записи обновления?

3.0 Senior🔥 171 комментариев
#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Транзакции в операциях записи и обновления

Транзакция — это атомарная последовательность операций с базой данных. Все операции либо выполняются полностью, либо откатываются при ошибке.

1. Основной принцип ACID

ACID — основа надёжных транзакций:

  • Atomicity — либо всё, либо ничего
  • Consistency — база остаётся в консистентном состоянии
  • Isolation — параллельные транзакции не мешают друг другу
  • Durability — сохранённые данные не теряются

2. SQLAlchemy с использованием транзакций

Простая транзакция:

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

engine = create_engine("postgresql://user:pass@localhost/db")

with Session(engine) as session:
    try:
        # Операции в транзакции
        user = session.query(User).filter(User.id == 1).first()
        user.balance -= 100
        
        another_user = session.query(User).filter(User.id == 2).first()
        another_user.balance += 100
        
        session.commit()  # Сохраняем
    except Exception as e:
        session.rollback()  # Откатываем при ошибке
        print(f"Ошибка: {e}")

Автоматический rollback:

with Session(engine) as session:
    user = session.query(User).filter(User.id == 1).first()
    user.balance -= 100
    # При выходе из блока автоматически вызывается commit() или rollback()

3. FastAPI с транзакциями

Контекстный менеджер:

from fastapi import FastAPI
from sqlalchemy.orm import Session
from contextlib import contextmanager

app = FastAPI()

@contextmanager
def get_transaction():
    session = Session(engine)
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

@app.post("/transfer")
async def transfer_money(from_user_id: int, to_user_id: int, amount: float):
    with get_transaction() as session:
        # Эта транзакция либо успеется полностью, либо откатится
        from_user = session.query(User).filter(User.id == from_user_id).first()
        to_user = session.query(User).filter(User.id == to_user_id).first()
        
        if from_user.balance < amount:
            raise ValueError("Недостаточно средств")
        
        from_user.balance -= amount
        to_user.balance += amount
        
        # Создаём лог транзакции
        log = TransactionLog(
            from_id=from_user_id,
            to_id=to_user_id,
            amount=amount
        )
        session.add(log)
        
        # Если здесь будет исключение, всё откатится
    
    return {"status": "success"}

4. Уровни изоляции транзакций

PostgreSQL поддерживает 4 уровня:

from sqlalchemy import text

# 1. READ UNCOMMITTED (самый слабый)
session.execute(text("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"))

# 2. READ COMMITTED (по умолчанию в PostgreSQL)
session.execute(text("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"))

# 3. REPEATABLE READ
session.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))

# 4. SERIALIZABLE (самый сильный)
session.execute(text("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"))

Проблемы без правильного уровня:

# Dirty Read (чтение грязных данных)
# Транзакция 1 не коммитилась, но Транзакция 2 видит её данные

# Non-repeatable Read
# Транзакция 1 читает строку дважды, между чтениями она изменилась

# Phantom Read
# Транзакция 1 выбирает набор строк, потом появляются новые строки

5. Блокировки и SELECT FOR UPDATE

Пессимистическое блокирование (для критичных операций):

@app.post("/checkout")
async def checkout(user_id: int, items: List[int]):
    with get_transaction() as session:
        # Блокируем пользователя для исключения race condition
        user = session.query(User).filter(
            User.id == user_id
        ).with_for_update().first()  # SELECT FOR UPDATE
        
        total_price = 0
        for item_id in items:
            item = session.query(Item).filter(
                Item.id == item_id
            ).with_for_update().first()
            
            if item.stock <= 0:
                raise ValueError(f"Item {item_id} нет в наличии")
            
            total_price += item.price
            item.stock -= 1
        
        if user.balance < total_price:
            raise ValueError("Недостаточно средств")
        
        user.balance -= total_price
        
        order = Order(user_id=user_id, items=items, total=total_price)
        session.add(order)
        
        # При выходе все блокировки снимаются
    
    return {"order_id": order.id}

6. Оптимистическое блокирование (версионирование)

Для high-concurrency систем:

from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    balance = Column(Float)
    version = Column(Integer, default=0)  # Версия для оптимистических блокировок
    updated_at = Column(DateTime, default=datetime.utcnow)

def update_user_balance(user_id: int, new_balance: float, expected_version: int):
    with get_transaction() as session:
        user = session.query(User).filter(User.id == user_id).first()
        
        # Проверяем версию
        if user.version != expected_version:
            raise ValueError("Данные были изменены другим процессом")
        
        user.balance = new_balance
        user.version += 1  # Увеличиваем версию
        user.updated_at = datetime.utcnow()

7. Savepoints (точки сохранения)

Откат части транзакции:

with get_transaction() as session:
    user = session.query(User).filter(User.id == 1).first()
    user.balance -= 100
    
    # Создаём точку сохранения
    savepoint = session.begin_nested()
    
    try:
        # Операция которая может упасть
        send_email_notification(user.email)
    except EmailError:
        # Откатываем только эту операцию
        savepoint.rollback()
        # Но деньги уже вычтены (это может быть нужно)

8. Обработка ошибок и конфликты

Retry логика для конфликтующих транзакций:

from sqlalchemy.exc import IntegrityError
import time

def execute_with_retry(func, max_retries=3, backoff=0.5):
    for attempt in range(max_retries):
        try:
            return func()
        except IntegrityError as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(backoff * (2 ** attempt))  # Exponential backoff

def transfer_money_with_retry(from_id: int, to_id: int, amount: float):
    def do_transfer():
        with get_transaction() as session:
            from_user = session.query(User).filter(User.id == from_id).first()
            to_user = session.query(User).filter(User.id == to_id).first()
            
            from_user.balance -= amount
            to_user.balance += amount
    
    execute_with_retry(do_transfer)

9. Неявные транзакции (autocommit=False)

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_pre_ping=True,  # Проверяем соединение перед использованием
)

# При autocommit=False (по умолчанию) нужно явно вызывать commit()
session = Session(engine, autocommit=False)

try:
    user = session.query(User).filter(User.id == 1).first()
    user.balance -= 100
    session.commit()  # Явный commit
except Exception:
    session.rollback()  # Явный rollback
finally:
    session.close()

10. Batch операции

Оптимизация для большого объёма:

@app.post("/bulk-update")
async def bulk_update(updates: List[UserUpdate]):
    with get_transaction() as session:
        # Более эффективно чем UPDATE в цикле
        session.bulk_update_mappings(
            User,
            [
                {"id": u.id, "balance": u.balance}
                for u in updates
            ]
        )
        # Одна транзакция вместо N

11. Deadlock обработка

Когда две транзакции ждут друг друга:

from sqlalchemy.exc import OperationalError

def transfer_safe(from_id: int, to_id: int, amount: float):
    # Всегда блокируем в одинаковом порядке
    # чтобы избежать deadlock
    id1, id2 = min(from_id, to_id), max(from_id, to_id)
    
    with get_transaction() as session:
        user1 = session.query(User).filter(User.id == id1).with_for_update().first()
        user2 = session.query(User).filter(User.id == id2).with_for_update().first()
        
        if from_id == id1:
            user1.balance -= amount
            user2.balance += amount
        else:
            user2.balance -= amount
            user1.balance += amount

Чеклист правильного использования транзакций

  • Всегда используешь try-finally или контекстный менеджер
  • Закрываешь сессию после работы
  • Используешь правильный уровень изоляции
  • Блокируешь в одинаковом порядке (избегаешь deadlock)
  • Логируешь критические операции
  • Обрабатываешь IntegrityError и конфликты
  • Минимизируешь время блокировки

Вывод

Транзакции — это основа надёжности приложения:

  1. По умолчанию используй контекстный менеджер
  2. Критичные операции — SELECT FOR UPDATE
  3. High-concurrency — оптимистическое блокирование
  4. Race conditions — правильный уровень изоляции
  5. Всегда обрабатывай ошибки и откатывай при проблемах
Как пользуешься понятием транзакция в контексте операций записи обновления? | PrepBro