← Назад к вопросам
Как решить проблему кольцевой блокировки?
3.0 Senior🔥 151 комментариев
#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Кольцевая блокировка (Deadlock) в БД
Deadlock происходит, когда две или более транзакции ждут друг друга в круге. Например, T1 ждет ресурса X, который держит T2, а T2 ждет ресурса Y, который держит T1.
1. Пример deadlock
# Transaction 1
async def transfer_money_1():
async with session.begin():
account_a = await session.get(Account, 1, with_for_update=True)
await asyncio.sleep(0.1) # Симуляция задержки
account_b = await session.get(Account, 2, with_for_update=True)
account_a.balance -= 100
account_b.balance += 100
# await session.commit() - автокоммит при выходе
# Transaction 2
async def transfer_money_2():
async with session.begin():
account_b = await session.get(Account, 2, with_for_update=True)
await asyncio.sleep(0.1) # Симуляция задержки
account_a = await session.get(Account, 1, with_for_update=True)
account_b.balance -= 100
account_a.balance += 100
# await session.commit() - автокоммит при выходе
# Если запустить параллельно:
await asyncio.gather(transfer_money_1(), transfer_money_2())
# -> PostgreSQL deadlock error!
2. Почему происходит
Транзакция 1 Транзакция 2
------- -------
LOCK account_1 LOCK account_2
WAIT account_2 WAIT account_1
(deadlock!)
Решение 1: Заказать блокировки в одинаковом порядке
Лучший и простейший способ:
async def transfer_money_safe(session, from_id: int, to_id: int, amount: float):
"""
Решение 1: Всегда блокировать в одинаковом порядке.
Блокируем сначала меньший ID, потом больший.
"""
# Нормализуем порядок
first_id, second_id = (from_id, to_id) if from_id < to_id else (to_id, from_id)
async with session.begin():
# Блокируем в одинаковом порядке всегда
account_first = await session.get(
Account, first_id, with_for_update=True
)
account_second = await session.get(
Account, second_id, with_for_update=True
)
# Теперь безопасно работать
if from_id == first_id:
account_first.balance -= amount
account_second.balance += amount
else:
account_second.balance -= amount
account_first.balance += amount
Решение 2: Timeout с retry
from sqlalchemy.exc import OperationalError
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
async def with_retry_on_deadlock(
func: Callable[[], Awaitable[T]],
max_retries: int = 3,
backoff_seconds: float = 0.1
) -> T:
"""
Retry транзакцию если произошёл deadlock.
Exponential backoff: 0.1s, 0.2s, 0.4s
"""
for attempt in range(max_retries):
try:
return await func()
except OperationalError as e:
# Проверяем, это deadlock ошибка
if 'deadlock' in str(e).lower():
if attempt == max_retries - 1:
raise # Последняя попытка, пробрасываем
wait_time = backoff_seconds * (2 ** attempt)
print(f"Deadlock detected, retry {attempt + 1} in {wait_time}s")
await asyncio.sleep(wait_time)
continue
raise # Другая ошибка, не deadlock
# Использование
async def transfer_with_retry(session, from_id, to_id, amount):
async def do_transfer():
async with session.begin():
acc1 = await session.get(Account, from_id, with_for_update=True)
await asyncio.sleep(0.1)
acc2 = await session.get(Account, to_id, with_for_update=True)
acc1.balance -= amount
acc2.balance += amount
return await with_retry_on_deadlock(do_transfer)
Решение 3: SELECT FOR UPDATE SKIP LOCKED
Для операций вида "обновить N свободных записей":
async def process_available_tasks(session, limit: int = 10):
"""
Выбрать свободные задачи без deadlock.
SKIP LOCKED - пропускает заблокированные строки.
"""
query = (
select(Task)
.where(Task.status == 'pending')
.with_for_update(skip_locked=True) # Ключевой момент
.limit(limit)
)
async with session.begin():
tasks = (await session.execute(query)).scalars().all()
for task in tasks:
task.status = 'processing'
# Работаем с задачей
# SQL при трансляции:
# SELECT * FROM tasks
# WHERE status = 'pending'
# FOR UPDATE SKIP LOCKED
# LIMIT 10;
Решение 4: Использовать уровень изоляции
from sqlalchemy import create_engine, event
from sqlalchemy.pool import NullPool
# Использование READ COMMITTED вместо SERIALIZABLE
engine = create_engine(
"postgresql://...",
isolation_level="READ_COMMITTED" # Менее строгий
)
# Для критичного кода можно явно установить
async with session.connection() as conn:
await conn.run_sync(
lambda c: c.execute(
"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"
)
)
Решение 5: Очередь вместо оптимистичного блокирования
Для параллельных операций над одним ресурсом:
from asyncio import Queue, Lock
class SerializedAccountUpdater:
def __init__(self, account_id: int):
self.account_id = account_id
self.queue = Queue()
self.lock = Lock()
async def process_updates(self):
"""
Обрабатываем все операции над счётом последовательно.
Гарантирует отсутствие deadlock'ов.
"""
while True:
operation = await self.queue.get()
async with self.lock:
async with session.begin():
account = await session.get(Account, self.account_id)
if operation['type'] == 'transfer_out':
account.balance -= operation['amount']
elif operation['type'] == 'transfer_in':
account.balance += operation['amount']
self.queue.task_done()
async def transfer(self, amount: float, direction: str):
"""Добавить операцию в очередь."""
await self.queue.put({
'type': f'transfer_{direction}',
'amount': amount
})
# Использование
updater = SerializedAccountUpdater(account_id=1)
asyncio.create_task(updater.process_updates())
await updater.transfer(100, 'out') # Будет обработано последовательно
await updater.transfer(50, 'in')
Решение 6: Pessimistic Lock с очередью операций
# Таблица для очередности
CREATE TABLE account_operations (
id UUID PRIMARY KEY,
account_id UUID NOT NULL REFERENCES accounts(id),
operation_type VARCHAR(50),
amount DECIMAL(10, 2),
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
UNIQUE(account_id, created_at)
);
# Worker обрабатывает по одной операции на счёт
async def process_account_operations():
async with session.begin():
# Берём одну операцию для каждого счёта
ops = await session.execute(
select(AccountOperation)
.where(AccountOperation.status == 'pending')
.distinct(AccountOperation.account_id)
.order_by(AccountOperation.account_id, AccountOperation.created_at)
.with_for_update(skip_locked=True)
)
for op in ops.scalars():
account = await session.get(Account, op.account_id)
account.balance += op.amount
op.status = 'completed'
Сравнение подходов
| Решение | Когда использовать | Преимущества | Недостатки |
|---|---|---|---|
| Упорядочить блокировки | Известный набор ресурсов | Просто, быстро | Требует рефакторинга |
| Retry с backoff | Редкие deadlock'и | Простое добавление | Может быть медленно при частых deadlock'ах |
| SKIP LOCKED | Параллельная обработка | Эффективно | Некоторые строки пропускаются |
| Очередь операций | Сложные зависимости | Гарантирует корректность | Требует дополнительной таблицы |
| Изменить уровень изоляции | Специфичные требования | Гибко | Может быть медленнее |
Лучшая практика
- Предпочти упорядочение блокировок - это проще и быстрее
- Добавь retry механизм - как подстраховка
- Используй SKIP LOCKED для параллельной обработки
- Логируй deadlock'и - это поможет найти паттерны
- Тестируй параллелизм - используй concurrent тесты
# Контрольный пример
async def safe_transfer(session, from_id: int, to_id: int, amount: float):
# 1. Упорядочиваем
ids = sorted([from_id, to_id])
# 2. Retry wrapper
async def do_transfer():
async with session.begin():
# 3. Блокируем в одинаковом порядке
acc1 = await session.get(Account, ids[0], with_for_update=True)
acc2 = await session.get(Account, ids[1], with_for_update=True)
if from_id == ids[0]:
acc1.balance -= amount
acc2.balance += amount
else:
acc2.balance -= amount
acc1.balance += amount
return await with_retry_on_deadlock(do_transfer, max_retries=3)