Как избегать коллизий при параллельной записи в БД?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Проблема параллельной записи
Когда несколько процессов/потоков одновременно пишут в БД, возникают race conditions, потеря данных и нарушение консистентности. Это критическая проблема в высоконагруженных системах.
Основные подходы решения
1. Pessimistic Locking (пессимистические блокировки)
Заблокировать запись до того как её изменять. Для критичных данных.
# SQLAlchemy с FOR UPDATE (PostgreSQL)
from sqlalchemy import text, select
from sqlalchemy.orm import Session
def transfer_money(session: Session, from_account_id: int, to_account_id: int, amount: float):
"""
Переводим деньги с проверкой баланса под блокировкой
"""
# Блокируем обе строки для эксклюзивного доступа
from_account = session.execute(
select(Account).where(Account.id == from_account_id).with_for_update()
).scalar_one()
to_account = session.execute(
select(Account).where(Account.id == to_account_id).with_for_update()
).scalar_one()
# Проверяем баланс под блокировкой
if from_account.balance < amount:
raise InsufficientFundsError()
# Изменяем под гарантией что никто другой не вмешивается
from_account.balance -= amount
to_account.balance += amount
session.commit()
Плюсы: Простота, гарантирует консистентность Минусы: Низкая пропускная способность, риск deadlock
2. Optimistic Locking (оптимистические блокировки)
Проверяем версию записи перед обновлением. Если кто-то изменил — откатываем.
from sqlalchemy import Column, Integer, DateTime
from datetime import datetime
class Product:
id = Column(Integer, primary_key=True)
name = Column(String)
price = Column(Float)
version = Column(Integer, default=0) # Версия для оптимистической блокировки
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def update_product_price(session: Session, product_id: int, new_price: float, current_version: int):
"""
Обновляем цену только если версия не изменилась
"""
product = session.query(Product).filter_by(id=product_id).one()
if product.version != current_version:
raise OptimisticLockError("Product was modified by another process")
product.price = new_price
product.version += 1 # Увеличиваем версию
session.commit()
return product
# Использование
try:
product = session.query(Product).filter_by(id=1).one()
update_product_price(session, product.id, 99.99, product.version)
except OptimisticLockError:
# Retry logic
session.rollback()
# Повторить операцию с новыми данными
Плюсы: Высокая пропускная способность, нет deadlock Минусы: Нужно обрабатывать конфликты и retry
3. Row-level Locking с SKIP LOCKED
Для высоконагруженных сценариев (поиск противника, распределение задач).
from sqlalchemy import select, and_
def find_available_opponent(session: Session, player_id: int, min_rating: int):
"""
Находим свободного противника без deadlock и без конфликтов
"""
opponent = session.execute(
select(Player)
.where(
and_(
Player.status == waiting,
Player.rating >= min_rating,
Player.id != player_id,
)
)
.with_for_update(skip_locked=True) # Пропускаем заблокированные
.limit(1)
).scalar_one_or_none()
if opponent:
opponent.status = in_game
session.commit()
return opponent
Плюсы: Максимальная параллелизм, нет deadlock Минусы: Только для некритичных изменений
4. Distributed Locks (Redlock, etcd)
Для микросервисной архитектуры когда данные в разных БД.
import redis
from typing import Optional
import uuid
import time
class DistributedLock:
def __init__(self, redis_client, key: str, ttl: int = 10):
self.redis = redis_client
self.key = key
self.ttl = ttl
self.lock_id = str(uuid.uuid4())
def acquire(self, timeout: int = 5) -> bool:
"""
Пытаемся получить блокировку
"""
end_time = time.time() + timeout
while time.time() < end_time:
result = self.redis.set(
self.key,
self.lock_id,
nx=True, # Только если ключа нет
ex=self.ttl # Автоматически удалится через ttl
)
if result:
return True
time.sleep(0.1)
return False
def release(self) -> bool:
"""
Освобождаем блокировку (только если мы её держим)
"""
# Используем скрипт для атомарности
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return self.redis.eval(script, 1, self.key, self.lock_id)
def __enter__(self):
if not self.acquire():
raise LockAcquisitionError(f"Could not acquire lock for {self.key}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# Использование
redis_client = redis.Redis(host=localhost, port=6379)
with DistributedLock(redis_client, "payment:user:123"):
# Критический участок кода
process_payment(user_id=123)
5. ACID транзакции с правильным Isolation Level
Выбирать правильный уровень изоляции для задачи.
from sqlalchemy import create_engine, event
engine = create_engine(postgresql://...)
@event.listens_for(engine, "connect")
def set_isolation_level(dbapi_conn, connection_record):
# SERIALIZABLE — максимальная защита (медленнее)
# REPEATABLE READ — средняя защита (стандарт PostgreSQL)
# READ COMMITTED — быстро но опасно
dbapi_conn.set_isolation_level(
dbapi_conn.isolation_levels.REPEATABLE_READ
)
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
def update_inventory(session: Session, product_id: int, quantity: int):
"""
Обновляем инвентарь с REPEATABLE READ
"""
product = session.query(Product).filter_by(id=product_id).one()
product.stock -= quantity
session.commit()
6. Event Sourcing подход
Вместо обновления состояния — сохраняем события.
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class EventType(Enum):
ACCOUNT_CREATED = "account.created"
MONEY_TRANSFERRED = "money.transferred"
MONEY_RECEIVED = "money.received"
@dataclass
class DomainEvent:
event_type: EventType
aggregate_id: int
timestamp: datetime
data: dict
class AccountEventStore:
def __init__(self, session: Session):
self.session = session
def append_event(self, event: DomainEvent):
"""
Записываем событие (идемпотентно)
"""
db_event = EventLog(
event_type=event.event_type.value,
aggregate_id=event.aggregate_id,
timestamp=event.timestamp,
data=json.dumps(event.data),
)
self.session.add(db_event)
self.session.commit()
def get_account_state(self, account_id: int) -> dict:
"""
Восстанавливаем состояние из событий
"""
events = self.session.query(EventLog).filter_by(
aggregate_id=account_id
).order_by(EventLog.timestamp).all()
state = {"balance": 0, "transactions": []}
for event in events:
if event.event_type == EventType.MONEY_TRANSFERRED.value:
state["balance"] -= event.data["amount"]
elif event.event_type == EventType.MONEY_RECEIVED.value:
state["balance"] += event.data["amount"]
state["transactions"].append(event.data)
return state
Когда использовать что
| Сценарий | Решение |
|---|---|
| Финансовые транзакции | Pessimistic locking + SERIALIZABLE |
| Обновление профиля | Optimistic locking с версией |
| Поиск свободного ресурса | SKIP LOCKED |
| Микросервисы | Distributed locks (Redis/etcd) |
| Критичная безопасность | Event sourcing |
| High throughput счётчики | Eventual consistency |
Итого
Нет одного идеального решения — выбор зависит от:
- Требований по консистентности (strong vs eventual)
- Пропускной способности системы
- Комплексности логики
- Архитектуры (монолит vs микросервисы)
Лучший подход — комбинировать несколько техник в зависимости от критичности данных.