← Назад к вопросам

Как избегать коллизий при параллельной записи в БД?

2.0 Middle🔥 151 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Проблема параллельной записи

Когда несколько процессов/потоков одновременно пишут в БД, возникают race conditions, потеря данных и нарушение консистентности. Это критическая проблема в высоконагруженных системах.

Основные подходы решения

1. Pessimistic Locking (пессимистические блокировки)

Заблокировать запись до того как её изменять. Для критичных данных.

# SQLAlchemy с FOR UPDATE (PostgreSQL)
from sqlalchemy import text, select
from sqlalchemy.orm import Session

def transfer_money(session: Session, from_account_id: int, to_account_id: int, amount: float):
    """
    Переводим деньги с проверкой баланса под блокировкой
    """
    # Блокируем обе строки для эксклюзивного доступа
    from_account = session.execute(
        select(Account).where(Account.id == from_account_id).with_for_update()
    ).scalar_one()
    
    to_account = session.execute(
        select(Account).where(Account.id == to_account_id).with_for_update()
    ).scalar_one()
    
    # Проверяем баланс под блокировкой
    if from_account.balance < amount:
        raise InsufficientFundsError()
    
    # Изменяем под гарантией что никто другой не вмешивается
    from_account.balance -= amount
    to_account.balance += amount
    
    session.commit()

Плюсы: Простота, гарантирует консистентность Минусы: Низкая пропускная способность, риск deadlock

2. Optimistic Locking (оптимистические блокировки)

Проверяем версию записи перед обновлением. Если кто-то изменил — откатываем.

from sqlalchemy import Column, Integer, DateTime
from datetime import datetime

class Product:
    id = Column(Integer, primary_key=True)
    name = Column(String)
    price = Column(Float)
    version = Column(Integer, default=0)  # Версия для оптимистической блокировки
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

def update_product_price(session: Session, product_id: int, new_price: float, current_version: int):
    """
    Обновляем цену только если версия не изменилась
    """
    product = session.query(Product).filter_by(id=product_id).one()
    
    if product.version != current_version:
        raise OptimisticLockError("Product was modified by another process")
    
    product.price = new_price
    product.version += 1  # Увеличиваем версию
    
    session.commit()
    return product

# Использование
try:
    product = session.query(Product).filter_by(id=1).one()
    update_product_price(session, product.id, 99.99, product.version)
except OptimisticLockError:
    # Retry logic
    session.rollback()
    # Повторить операцию с новыми данными

Плюсы: Высокая пропускная способность, нет deadlock Минусы: Нужно обрабатывать конфликты и retry

3. Row-level Locking с SKIP LOCKED

Для высоконагруженных сценариев (поиск противника, распределение задач).

from sqlalchemy import select, and_

def find_available_opponent(session: Session, player_id: int, min_rating: int):
    """
    Находим свободного противника без deadlock и без конфликтов
    """
    opponent = session.execute(
        select(Player)
        .where(
            and_(
                Player.status == waiting,
                Player.rating >= min_rating,
                Player.id != player_id,
            )
        )
        .with_for_update(skip_locked=True)  # Пропускаем заблокированные
        .limit(1)
    ).scalar_one_or_none()
    
    if opponent:
        opponent.status = in_game
        session.commit()
    
    return opponent

Плюсы: Максимальная параллелизм, нет deadlock Минусы: Только для некритичных изменений

4. Distributed Locks (Redlock, etcd)

Для микросервисной архитектуры когда данные в разных БД.

import redis
from typing import Optional
import uuid
import time

class DistributedLock:
    def __init__(self, redis_client, key: str, ttl: int = 10):
        self.redis = redis_client
        self.key = key
        self.ttl = ttl
        self.lock_id = str(uuid.uuid4())
    
    def acquire(self, timeout: int = 5) -> bool:
        """
        Пытаемся получить блокировку
        """
        end_time = time.time() + timeout
        
        while time.time() < end_time:
            result = self.redis.set(
                self.key,
                self.lock_id,
                nx=True,  # Только если ключа нет
                ex=self.ttl  # Автоматически удалится через ttl
            )
            
            if result:
                return True
            
            time.sleep(0.1)
        
        return False
    
    def release(self) -> bool:
        """
        Освобождаем блокировку (только если мы её держим)
        """
        # Используем скрипт для атомарности
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        return self.redis.eval(script, 1, self.key, self.lock_id)
    
    def __enter__(self):
        if not self.acquire():
            raise LockAcquisitionError(f"Could not acquire lock for {self.key}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# Использование
redis_client = redis.Redis(host=localhost, port=6379)

with DistributedLock(redis_client, "payment:user:123"):
    # Критический участок кода
    process_payment(user_id=123)

5. ACID транзакции с правильным Isolation Level

Выбирать правильный уровень изоляции для задачи.

from sqlalchemy import create_engine, event

engine = create_engine(postgresql://...)

@event.listens_for(engine, "connect")
def set_isolation_level(dbapi_conn, connection_record):
    # SERIALIZABLE — максимальная защита (медленнее)
    # REPEATABLE READ — средняя защита (стандарт PostgreSQL)
    # READ COMMITTED — быстро но опасно
    dbapi_conn.set_isolation_level(
        dbapi_conn.isolation_levels.REPEATABLE_READ
    )

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)

def update_inventory(session: Session, product_id: int, quantity: int):
    """
    Обновляем инвентарь с REPEATABLE READ
    """
    product = session.query(Product).filter_by(id=product_id).one()
    product.stock -= quantity
    session.commit()

6. Event Sourcing подход

Вместо обновления состояния — сохраняем события.

from dataclasses import dataclass
from datetime import datetime
from enum import Enum

class EventType(Enum):
    ACCOUNT_CREATED = "account.created"
    MONEY_TRANSFERRED = "money.transferred"
    MONEY_RECEIVED = "money.received"

@dataclass
class DomainEvent:
    event_type: EventType
    aggregate_id: int
    timestamp: datetime
    data: dict

class AccountEventStore:
    def __init__(self, session: Session):
        self.session = session
    
    def append_event(self, event: DomainEvent):
        """
        Записываем событие (идемпотентно)
        """
        db_event = EventLog(
            event_type=event.event_type.value,
            aggregate_id=event.aggregate_id,
            timestamp=event.timestamp,
            data=json.dumps(event.data),
        )
        self.session.add(db_event)
        self.session.commit()
    
    def get_account_state(self, account_id: int) -> dict:
        """
        Восстанавливаем состояние из событий
        """
        events = self.session.query(EventLog).filter_by(
            aggregate_id=account_id
        ).order_by(EventLog.timestamp).all()
        
        state = {"balance": 0, "transactions": []}
        
        for event in events:
            if event.event_type == EventType.MONEY_TRANSFERRED.value:
                state["balance"] -= event.data["amount"]
            elif event.event_type == EventType.MONEY_RECEIVED.value:
                state["balance"] += event.data["amount"]
            
            state["transactions"].append(event.data)
        
        return state

Когда использовать что

СценарийРешение
Финансовые транзакцииPessimistic locking + SERIALIZABLE
Обновление профиляOptimistic locking с версией
Поиск свободного ресурсаSKIP LOCKED
МикросервисыDistributed locks (Redis/etcd)
Критичная безопасностьEvent sourcing
High throughput счётчикиEventual consistency

Итого

Нет одного идеального решения — выбор зависит от:

  • Требований по консистентности (strong vs eventual)
  • Пропускной способности системы
  • Комплексности логики
  • Архитектуры (монолит vs микросервисы)

Лучший подход — комбинировать несколько техник в зависимости от критичности данных.