← Назад к вопросам

Как решить проблему кольцевой блокировки?

3.0 Senior🔥 151 комментариев
#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Кольцевая блокировка (Deadlock) в БД

Deadlock происходит, когда две или более транзакции ждут друг друга в круге. Например, T1 ждет ресурса X, который держит T2, а T2 ждет ресурса Y, который держит T1.

1. Пример deadlock

# Transaction 1
async def transfer_money_1():
    async with session.begin():
        account_a = await session.get(Account, 1, with_for_update=True)
        await asyncio.sleep(0.1)  # Симуляция задержки
        account_b = await session.get(Account, 2, with_for_update=True)
        account_a.balance -= 100
        account_b.balance += 100
        # await session.commit()  - автокоммит при выходе

# Transaction 2
async def transfer_money_2():
    async with session.begin():
        account_b = await session.get(Account, 2, with_for_update=True)
        await asyncio.sleep(0.1)  # Симуляция задержки
        account_a = await session.get(Account, 1, with_for_update=True)
        account_b.balance -= 100
        account_a.balance += 100
        # await session.commit()  - автокоммит при выходе

# Если запустить параллельно:
await asyncio.gather(transfer_money_1(), transfer_money_2())
# -> PostgreSQL deadlock error!

2. Почему происходит

Транзакция 1             Транзакция 2
-------                 -------
LOCK account_1          LOCK account_2
WAIT account_2          WAIT account_1
(deadlock!)

Решение 1: Заказать блокировки в одинаковом порядке

Лучший и простейший способ:

async def transfer_money_safe(session, from_id: int, to_id: int, amount: float):
    """
    Решение 1: Всегда блокировать в одинаковом порядке.
    Блокируем сначала меньший ID, потом больший.
    """
    # Нормализуем порядок
    first_id, second_id = (from_id, to_id) if from_id < to_id else (to_id, from_id)
    
    async with session.begin():
        # Блокируем в одинаковом порядке всегда
        account_first = await session.get(
            Account, first_id, with_for_update=True
        )
        account_second = await session.get(
            Account, second_id, with_for_update=True
        )
        
        # Теперь безопасно работать
        if from_id == first_id:
            account_first.balance -= amount
            account_second.balance += amount
        else:
            account_second.balance -= amount
            account_first.balance += amount

Решение 2: Timeout с retry

from sqlalchemy.exc import OperationalError
import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

async def with_retry_on_deadlock(
    func: Callable[[], Awaitable[T]],
    max_retries: int = 3,
    backoff_seconds: float = 0.1
) -> T:
    """
    Retry транзакцию если произошёл deadlock.
    Exponential backoff: 0.1s, 0.2s, 0.4s
    """
    for attempt in range(max_retries):
        try:
            return await func()
        except OperationalError as e:
            # Проверяем, это deadlock ошибка
            if 'deadlock' in str(e).lower():
                if attempt == max_retries - 1:
                    raise  # Последняя попытка, пробрасываем
                
                wait_time = backoff_seconds * (2 ** attempt)
                print(f"Deadlock detected, retry {attempt + 1} in {wait_time}s")
                await asyncio.sleep(wait_time)
                continue
            
            raise  # Другая ошибка, не deadlock

# Использование
async def transfer_with_retry(session, from_id, to_id, amount):
    async def do_transfer():
        async with session.begin():
            acc1 = await session.get(Account, from_id, with_for_update=True)
            await asyncio.sleep(0.1)
            acc2 = await session.get(Account, to_id, with_for_update=True)
            acc1.balance -= amount
            acc2.balance += amount
    
    return await with_retry_on_deadlock(do_transfer)

Решение 3: SELECT FOR UPDATE SKIP LOCKED

Для операций вида "обновить N свободных записей":

async def process_available_tasks(session, limit: int = 10):
    """
    Выбрать свободные задачи без deadlock.
    SKIP LOCKED - пропускает заблокированные строки.
    """
    query = (
        select(Task)
        .where(Task.status == 'pending')
        .with_for_update(skip_locked=True)  # Ключевой момент
        .limit(limit)
    )
    
    async with session.begin():
        tasks = (await session.execute(query)).scalars().all()
        
        for task in tasks:
            task.status = 'processing'
            # Работаем с задачей

# SQL при трансляции:
# SELECT * FROM tasks
# WHERE status = 'pending'
# FOR UPDATE SKIP LOCKED
# LIMIT 10;

Решение 4: Использовать уровень изоляции

from sqlalchemy import create_engine, event
from sqlalchemy.pool import NullPool

# Использование READ COMMITTED вместо SERIALIZABLE
engine = create_engine(
    "postgresql://...",
    isolation_level="READ_COMMITTED"  # Менее строгий
)

# Для критичного кода можно явно установить
async with session.connection() as conn:
    await conn.run_sync(
        lambda c: c.execute(
            "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"
        )
    )

Решение 5: Очередь вместо оптимистичного блокирования

Для параллельных операций над одним ресурсом:

from asyncio import Queue, Lock

class SerializedAccountUpdater:
    def __init__(self, account_id: int):
        self.account_id = account_id
        self.queue = Queue()
        self.lock = Lock()
    
    async def process_updates(self):
        """
        Обрабатываем все операции над счётом последовательно.
        Гарантирует отсутствие deadlock'ов.
        """
        while True:
            operation = await self.queue.get()
            
            async with self.lock:
                async with session.begin():
                    account = await session.get(Account, self.account_id)
                    
                    if operation['type'] == 'transfer_out':
                        account.balance -= operation['amount']
                    elif operation['type'] == 'transfer_in':
                        account.balance += operation['amount']
                    
                    self.queue.task_done()
    
    async def transfer(self, amount: float, direction: str):
        """Добавить операцию в очередь."""
        await self.queue.put({
            'type': f'transfer_{direction}',
            'amount': amount
        })

# Использование
updater = SerializedAccountUpdater(account_id=1)
asyncio.create_task(updater.process_updates())

await updater.transfer(100, 'out')  # Будет обработано последовательно
await updater.transfer(50, 'in')

Решение 6: Pessimistic Lock с очередью операций

# Таблица для очередности
CREATE TABLE account_operations (
    id UUID PRIMARY KEY,
    account_id UUID NOT NULL REFERENCES accounts(id),
    operation_type VARCHAR(50),
    amount DECIMAL(10, 2),
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(account_id, created_at)
);

# Worker обрабатывает по одной операции на счёт
async def process_account_operations():
    async with session.begin():
        # Берём одну операцию для каждого счёта
        ops = await session.execute(
            select(AccountOperation)
            .where(AccountOperation.status == 'pending')
            .distinct(AccountOperation.account_id)
            .order_by(AccountOperation.account_id, AccountOperation.created_at)
            .with_for_update(skip_locked=True)
        )
        
        for op in ops.scalars():
            account = await session.get(Account, op.account_id)
            account.balance += op.amount
            op.status = 'completed'

Сравнение подходов

РешениеКогда использоватьПреимуществаНедостатки
Упорядочить блокировкиИзвестный набор ресурсовПросто, быстроТребует рефакторинга
Retry с backoffРедкие deadlock'иПростое добавлениеМожет быть медленно при частых deadlock'ах
SKIP LOCKEDПараллельная обработкаЭффективноНекоторые строки пропускаются
Очередь операцийСложные зависимостиГарантирует корректностьТребует дополнительной таблицы
Изменить уровень изоляцииСпецифичные требованияГибкоМожет быть медленнее

Лучшая практика

  1. Предпочти упорядочение блокировок - это проще и быстрее
  2. Добавь retry механизм - как подстраховка
  3. Используй SKIP LOCKED для параллельной обработки
  4. Логируй deadlock'и - это поможет найти паттерны
  5. Тестируй параллелизм - используй concurrent тесты
# Контрольный пример
async def safe_transfer(session, from_id: int, to_id: int, amount: float):
    # 1. Упорядочиваем
    ids = sorted([from_id, to_id])
    
    # 2. Retry wrapper
    async def do_transfer():
        async with session.begin():
            # 3. Блокируем в одинаковом порядке
            acc1 = await session.get(Account, ids[0], with_for_update=True)
            acc2 = await session.get(Account, ids[1], with_for_update=True)
            
            if from_id == ids[0]:
                acc1.balance -= amount
                acc2.balance += amount
            else:
                acc2.balance -= amount
                acc1.balance += amount
    
    return await with_retry_on_deadlock(do_transfer, max_retries=3)