Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Избегание гонки значений в БД
Гонка значений (race condition) возникает, когда несколько процессов одновременно читают и изменяют одни и те же данные. Это критичная проблема в многопоточных и распределённых системах.
1. Транзакции с уровнем изоляции
Выбор правильного уровня изоляции — первый шаг:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
engine = create_engine("postgresql://...")
with engine.begin() as connection:
# READ_COMMITTED — стандартный уровень (по умолчанию)
connection.execute(text("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"))
# REPEATABLE_READ — предотвращает фантомные чтения
# SERIALIZABLE — полная сериализация (самый безопасный)
Уровни изоляции PostgreSQL:
- READ UNCOMMITTED — опасен, не защищает от гонки
- READ COMMITTED — защищает от грязных чтений, но не от гонки
- REPEATABLE READ — защищает от фантомных чтений
- SERIALIZABLE — полная защита, но медленнее
2. SELECT FOR UPDATE (пессимистичная блокировка)
Этот метод блокирует строку на время чтения и изменения:
from sqlalchemy import select
from sqlalchemy.orm import Session
from models import Account
def transfer_money(session: Session, from_id: int, to_id: int, amount: float):
# Блокируем строку от редактирования другими транзакциями
from_account = session.execute(
select(Account).where(Account.id == from_id).with_for_update()
).scalar_one()
to_account = session.execute(
select(Account).where(Account.id == to_id).with_for_update()
).scalar_one()
if from_account.balance >= amount:
from_account.balance -= amount
to_account.balance += amount
session.commit()
else:
raise ValueError("Insufficient funds")
3. Оптимистичная блокировка (версионирование)
Добавь поле версии (версионное число) в таблицу:
from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Account(Base):
__tablename__ = "accounts"
id = Column(Integer, primary_key=True)
name = Column(String)
balance = Column(Float)
version = Column(Integer, default=0) # Версия для оптимистичной блокировки
def transfer_money(session: Session, from_id: int, to_id: int, amount: float):
from_account = session.query(Account).filter(Account.id == from_id).one()
to_account = session.query(Account).filter(Account.id == to_id).one()
old_version = from_account.version
from_account.balance -= amount
to_account.balance += amount
from_account.version += 1
try:
session.flush()
# Проверяем, не изменилась ли версия (используй WHERE clause)
result = session.execute(
text(
"UPDATE accounts SET balance = :balance, version = :version "
"WHERE id = :id AND version = :old_version"
),
{"balance": from_account.balance, "version": old_version + 1,
"id": from_id, "old_version": old_version}
)
if result.rowcount == 0:
raise StaleObjectError("Account was modified by another transaction")
session.commit()
except StaleObjectError:
session.rollback()
raise
4. Использование SKIP LOCKED (для очереди)
Когда нужно выбрать доступные ресурсы, пропускаем уже заблокированные:
def get_available_task(session: Session) -> Task:
# Берём первую доступную задачу, пропускаем уже занятые
task = session.execute(
select(Task)
.where(Task.status == "pending")
.with_for_update(skip_locked=True) # Ключевой параметр!
.limit(1)
).scalar_one_or_none()
if task:
task.status = "processing"
session.commit()
return task
5. Атомарные операции на уровне БД
Используй встроенные функции БД для атомарных операций:
from sqlalchemy import func, update
def increment_counter(session: Session, counter_id: int):
# Атомарный инкремент на уровне БД
session.execute(
update(Counter)
.where(Counter.id == counter_id)
.values(count=Counter.count + 1)
)
session.commit()
6. Асинхронная обработка с очередью
Для критичных операций используй очередь задач:
from celery import Celery
app = Celery()
@app.task(bind=True, max_retries=3)
def transfer_money_async(self, from_id, to_id, amount):
try:
session = get_session()
# ... логика с SELECT FOR UPDATE
session.commit()
except Exception as exc:
# Повторим задачу при конфликте
self.retry(exc=exc, countdown=2)
Рекомендации по выбору подхода
| Метод | Когда использовать | Плюсы | Минусы |
|---|---|---|---|
| SELECT FOR UPDATE | Критичные операции с деньгами | Надёжно, просто | Может быть медленным |
| REPEATABLE READ | Общие случаи | Хороший баланс | Может быть сложнее в отладке |
| Оптимистичная блокировка | Низкий конфликт, высокая нагрузка | Быстро, масштабируемо | Нужна логика переповтора |
| SKIP LOCKED | Очереди и распределённые задачи | Красиво, масштабируемо | Требует поддержки БД |
Итоги
Гонка значений в БД предотвращается комбинацией:
- Правильный уровень изоляции — REPEATABLE READ минимум
- SELECT FOR UPDATE — для критичных операций
- Версионирование — для оптимистичного сценария
- SKIP LOCKED — для распределённых очередей
- Асинхронные очереди — для сложных процессов
Выбор метода зависит от характера данных, частоты конфликтов и требований к производительности.