← Назад к вопросам

Как избежать гонки значений в БД?

1.8 Middle🔥 181 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Избегание гонки значений в БД

Гонка значений (race condition) возникает, когда несколько процессов одновременно читают и изменяют одни и те же данные. Это критичная проблема в многопоточных и распределённых системах.

1. Транзакции с уровнем изоляции

Выбор правильного уровня изоляции — первый шаг:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

engine = create_engine("postgresql://...")

with engine.begin() as connection:
    # READ_COMMITTED — стандартный уровень (по умолчанию)
    connection.execute(text("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"))
    # REPEATABLE_READ — предотвращает фантомные чтения
    # SERIALIZABLE — полная сериализация (самый безопасный)

Уровни изоляции PostgreSQL:

  • READ UNCOMMITTED — опасен, не защищает от гонки
  • READ COMMITTED — защищает от грязных чтений, но не от гонки
  • REPEATABLE READ — защищает от фантомных чтений
  • SERIALIZABLE — полная защита, но медленнее

2. SELECT FOR UPDATE (пессимистичная блокировка)

Этот метод блокирует строку на время чтения и изменения:

from sqlalchemy import select
from sqlalchemy.orm import Session
from models import Account

def transfer_money(session: Session, from_id: int, to_id: int, amount: float):
    # Блокируем строку от редактирования другими транзакциями
    from_account = session.execute(
        select(Account).where(Account.id == from_id).with_for_update()
    ).scalar_one()
    
    to_account = session.execute(
        select(Account).where(Account.id == to_id).with_for_update()
    ).scalar_one()
    
    if from_account.balance >= amount:
        from_account.balance -= amount
        to_account.balance += amount
        session.commit()
    else:
        raise ValueError("Insufficient funds")

3. Оптимистичная блокировка (версионирование)

Добавь поле версии (версионное число) в таблицу:

from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class Account(Base):
    __tablename__ = "accounts"
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    balance = Column(Float)
    version = Column(Integer, default=0)  # Версия для оптимистичной блокировки

def transfer_money(session: Session, from_id: int, to_id: int, amount: float):
    from_account = session.query(Account).filter(Account.id == from_id).one()
    to_account = session.query(Account).filter(Account.id == to_id).one()
    
    old_version = from_account.version
    
    from_account.balance -= amount
    to_account.balance += amount
    from_account.version += 1
    
    try:
        session.flush()
        # Проверяем, не изменилась ли версия (используй WHERE clause)
        result = session.execute(
            text(
                "UPDATE accounts SET balance = :balance, version = :version "
                "WHERE id = :id AND version = :old_version"
            ),
            {"balance": from_account.balance, "version": old_version + 1, 
             "id": from_id, "old_version": old_version}
        )
        
        if result.rowcount == 0:
            raise StaleObjectError("Account was modified by another transaction")
        
        session.commit()
    except StaleObjectError:
        session.rollback()
        raise

4. Использование SKIP LOCKED (для очереди)

Когда нужно выбрать доступные ресурсы, пропускаем уже заблокированные:

def get_available_task(session: Session) -> Task:
    # Берём первую доступную задачу, пропускаем уже занятые
    task = session.execute(
        select(Task)
        .where(Task.status == "pending")
        .with_for_update(skip_locked=True)  # Ключевой параметр!
        .limit(1)
    ).scalar_one_or_none()
    
    if task:
        task.status = "processing"
        session.commit()
    
    return task

5. Атомарные операции на уровне БД

Используй встроенные функции БД для атомарных операций:

from sqlalchemy import func, update

def increment_counter(session: Session, counter_id: int):
    # Атомарный инкремент на уровне БД
    session.execute(
        update(Counter)
        .where(Counter.id == counter_id)
        .values(count=Counter.count + 1)
    )
    session.commit()

6. Асинхронная обработка с очередью

Для критичных операций используй очередь задач:

from celery import Celery

app = Celery()

@app.task(bind=True, max_retries=3)
def transfer_money_async(self, from_id, to_id, amount):
    try:
        session = get_session()
        # ... логика с SELECT FOR UPDATE
        session.commit()
    except Exception as exc:
        # Повторим задачу при конфликте
        self.retry(exc=exc, countdown=2)

Рекомендации по выбору подхода

МетодКогда использоватьПлюсыМинусы
SELECT FOR UPDATEКритичные операции с деньгамиНадёжно, простоМожет быть медленным
REPEATABLE READОбщие случаиХороший балансМожет быть сложнее в отладке
Оптимистичная блокировкаНизкий конфликт, высокая нагрузкаБыстро, масштабируемоНужна логика переповтора
SKIP LOCKEDОчереди и распределённые задачиКрасиво, масштабируемоТребует поддержки БД

Итоги

Гонка значений в БД предотвращается комбинацией:

  1. Правильный уровень изоляции — REPEATABLE READ минимум
  2. SELECT FOR UPDATE — для критичных операций
  3. Версионирование — для оптимистичного сценария
  4. SKIP LOCKED — для распределённых очередей
  5. Асинхронные очереди — для сложных процессов

Выбор метода зависит от характера данных, частоты конфликтов и требований к производительности.