← Назад к вопросам
Как пользуешься понятием транзакция в контексте операций записи обновления?
3.0 Senior🔥 171 комментариев
#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Транзакции в операциях записи и обновления
Транзакция — это атомарная последовательность операций с базой данных. Все операции либо выполняются полностью, либо откатываются при ошибке.
1. Основной принцип ACID
ACID — основа надёжных транзакций:
- Atomicity — либо всё, либо ничего
- Consistency — база остаётся в консистентном состоянии
- Isolation — параллельные транзакции не мешают друг другу
- Durability — сохранённые данные не теряются
2. SQLAlchemy с использованием транзакций
Простая транзакция:
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("postgresql://user:pass@localhost/db")
with Session(engine) as session:
try:
# Операции в транзакции
user = session.query(User).filter(User.id == 1).first()
user.balance -= 100
another_user = session.query(User).filter(User.id == 2).first()
another_user.balance += 100
session.commit() # Сохраняем
except Exception as e:
session.rollback() # Откатываем при ошибке
print(f"Ошибка: {e}")
Автоматический rollback:
with Session(engine) as session:
user = session.query(User).filter(User.id == 1).first()
user.balance -= 100
# При выходе из блока автоматически вызывается commit() или rollback()
3. FastAPI с транзакциями
Контекстный менеджер:
from fastapi import FastAPI
from sqlalchemy.orm import Session
from contextlib import contextmanager
app = FastAPI()
@contextmanager
def get_transaction():
session = Session(engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
@app.post("/transfer")
async def transfer_money(from_user_id: int, to_user_id: int, amount: float):
with get_transaction() as session:
# Эта транзакция либо успеется полностью, либо откатится
from_user = session.query(User).filter(User.id == from_user_id).first()
to_user = session.query(User).filter(User.id == to_user_id).first()
if from_user.balance < amount:
raise ValueError("Недостаточно средств")
from_user.balance -= amount
to_user.balance += amount
# Создаём лог транзакции
log = TransactionLog(
from_id=from_user_id,
to_id=to_user_id,
amount=amount
)
session.add(log)
# Если здесь будет исключение, всё откатится
return {"status": "success"}
4. Уровни изоляции транзакций
PostgreSQL поддерживает 4 уровня:
from sqlalchemy import text
# 1. READ UNCOMMITTED (самый слабый)
session.execute(text("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"))
# 2. READ COMMITTED (по умолчанию в PostgreSQL)
session.execute(text("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"))
# 3. REPEATABLE READ
session.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
# 4. SERIALIZABLE (самый сильный)
session.execute(text("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"))
Проблемы без правильного уровня:
# Dirty Read (чтение грязных данных)
# Транзакция 1 не коммитилась, но Транзакция 2 видит её данные
# Non-repeatable Read
# Транзакция 1 читает строку дважды, между чтениями она изменилась
# Phantom Read
# Транзакция 1 выбирает набор строк, потом появляются новые строки
5. Блокировки и SELECT FOR UPDATE
Пессимистическое блокирование (для критичных операций):
@app.post("/checkout")
async def checkout(user_id: int, items: List[int]):
with get_transaction() as session:
# Блокируем пользователя для исключения race condition
user = session.query(User).filter(
User.id == user_id
).with_for_update().first() # SELECT FOR UPDATE
total_price = 0
for item_id in items:
item = session.query(Item).filter(
Item.id == item_id
).with_for_update().first()
if item.stock <= 0:
raise ValueError(f"Item {item_id} нет в наличии")
total_price += item.price
item.stock -= 1
if user.balance < total_price:
raise ValueError("Недостаточно средств")
user.balance -= total_price
order = Order(user_id=user_id, items=items, total=total_price)
session.add(order)
# При выходе все блокировки снимаются
return {"order_id": order.id}
6. Оптимистическое блокирование (версионирование)
Для high-concurrency систем:
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
balance = Column(Float)
version = Column(Integer, default=0) # Версия для оптимистических блокировок
updated_at = Column(DateTime, default=datetime.utcnow)
def update_user_balance(user_id: int, new_balance: float, expected_version: int):
with get_transaction() as session:
user = session.query(User).filter(User.id == user_id).first()
# Проверяем версию
if user.version != expected_version:
raise ValueError("Данные были изменены другим процессом")
user.balance = new_balance
user.version += 1 # Увеличиваем версию
user.updated_at = datetime.utcnow()
7. Savepoints (точки сохранения)
Откат части транзакции:
with get_transaction() as session:
user = session.query(User).filter(User.id == 1).first()
user.balance -= 100
# Создаём точку сохранения
savepoint = session.begin_nested()
try:
# Операция которая может упасть
send_email_notification(user.email)
except EmailError:
# Откатываем только эту операцию
savepoint.rollback()
# Но деньги уже вычтены (это может быть нужно)
8. Обработка ошибок и конфликты
Retry логика для конфликтующих транзакций:
from sqlalchemy.exc import IntegrityError
import time
def execute_with_retry(func, max_retries=3, backoff=0.5):
for attempt in range(max_retries):
try:
return func()
except IntegrityError as e:
if attempt == max_retries - 1:
raise
time.sleep(backoff * (2 ** attempt)) # Exponential backoff
def transfer_money_with_retry(from_id: int, to_id: int, amount: float):
def do_transfer():
with get_transaction() as session:
from_user = session.query(User).filter(User.id == from_id).first()
to_user = session.query(User).filter(User.id == to_id).first()
from_user.balance -= amount
to_user.balance += amount
execute_with_retry(do_transfer)
9. Неявные транзакции (autocommit=False)
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_pre_ping=True, # Проверяем соединение перед использованием
)
# При autocommit=False (по умолчанию) нужно явно вызывать commit()
session = Session(engine, autocommit=False)
try:
user = session.query(User).filter(User.id == 1).first()
user.balance -= 100
session.commit() # Явный commit
except Exception:
session.rollback() # Явный rollback
finally:
session.close()
10. Batch операции
Оптимизация для большого объёма:
@app.post("/bulk-update")
async def bulk_update(updates: List[UserUpdate]):
with get_transaction() as session:
# Более эффективно чем UPDATE в цикле
session.bulk_update_mappings(
User,
[
{"id": u.id, "balance": u.balance}
for u in updates
]
)
# Одна транзакция вместо N
11. Deadlock обработка
Когда две транзакции ждут друг друга:
from sqlalchemy.exc import OperationalError
def transfer_safe(from_id: int, to_id: int, amount: float):
# Всегда блокируем в одинаковом порядке
# чтобы избежать deadlock
id1, id2 = min(from_id, to_id), max(from_id, to_id)
with get_transaction() as session:
user1 = session.query(User).filter(User.id == id1).with_for_update().first()
user2 = session.query(User).filter(User.id == id2).with_for_update().first()
if from_id == id1:
user1.balance -= amount
user2.balance += amount
else:
user2.balance -= amount
user1.balance += amount
Чеклист правильного использования транзакций
- Всегда используешь try-finally или контекстный менеджер
- Закрываешь сессию после работы
- Используешь правильный уровень изоляции
- Блокируешь в одинаковом порядке (избегаешь deadlock)
- Логируешь критические операции
- Обрабатываешь IntegrityError и конфликты
- Минимизируешь время блокировки
Вывод
Транзакции — это основа надёжности приложения:
- По умолчанию используй контекстный менеджер
- Критичные операции — SELECT FOR UPDATE
- High-concurrency — оптимистическое блокирование
- Race conditions — правильный уровень изоляции
- Всегда обрабатывай ошибки и откатывай при проблемах