← Назад к вопросам
Какие знаешь способы разрешения Deadlock?
3.0 Senior🔥 191 комментариев
#Асинхронность и многопоточность#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Способы разрешения Deadlock (Взаимная блокировка)
Deadlock возникает, когда два или более процесса/потока взаимно блокируют друг друга, ожидая ресурсы, которыми друг друга владеют. Рассмотрю методы предотвращения и разрешения.
Условия возникновения Deadlock (Coffman conditions)
Все четыре условия должны быть выполнены одновременно:
- Взаимное исключение — ресурс используется одним процессом
- Удержание и ожидание — процесс удерживает ресурс и ждет другого
- Невозможность отобрать — ресурс нельзя насильственно отобрать
- Циклическое ожидание — цепочка процессов ждут друг друга
Способ 1: Порядок захвата ресурсов
Префиксирование — все потоки захватывают ресурсы в одном порядке:
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a: # Всегда берём A первым
time.sleep(0.1)
with lock_b:
print("Thread 1 OK")
def thread_2():
with lock_a: # Также A первым, не B первым!
time.sleep(0.1)
with lock_b:
print("Thread 2 OK")
t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start()
t2.start()
t1.join()
t2.join()
Способ 2: Таймауты
Захватываем ресурсы с таймаутом и отступаем при конфликте:
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def safe_transaction():
max_retries = 3
for attempt in range(max_retries):
if lock_a.acquire(timeout=1):
try:
if lock_b.acquire(timeout=1):
try:
print(f"Success on attempt {attempt + 1}")
return True
finally:
lock_b.release()
else:
print(f"Could not acquire lock_b, retrying...")
continue
finally:
lock_a.release()
else:
print(f"Could not acquire lock_a, retrying...")
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
return False
threads = []
for _ in range(5):
t = threading.Thread(target=safe_transaction)
threads.append(t)
t.start()
for t in threads:
t.join()
Способ 3: Асинхронный подход (asyncio)
Асинхронность предотвращает deadlock по природе:
import asyncio
lock_a = asyncio.Lock()
lock_b = asyncio.Lock()
async def safe_operation():
async with lock_a:
await asyncio.sleep(0.1)
async with lock_b:
print("Operation successful")
return True
async def main():
tasks = [safe_operation() for _ in range(10)]
results = await asyncio.gather(*tasks)
print(f"Completed: {sum(results)} operations")
asyncio.run(main())
Способ 4: Context Manager с гарантированным освобождением
import threading
from contextlib import contextmanager
resources = {}
resource_lock = threading.Lock()
@contextmanager
def acquire_resource(resource_id, timeout=1):
"""Контекстный менеджер для безопасного захвата ресурса"""
acquired = False
try:
with resource_lock:
resource = resources.get(resource_id)
if resource is None:
resource = threading.Lock()
resources[resource_id] = resource
if resource.acquire(timeout=timeout):
acquired = True
yield resource
else:
raise TimeoutError(f"Could not acquire resource {resource_id}")
finally:
if acquired:
resource.release()
def safe_access(resource_id):
try:
with acquire_resource(resource_id):
print(f"Accessed resource {resource_id}")
except TimeoutError as e:
print(f"Failed: {e}")
Способ 5: Thread-safe очередь
Используем queue вместо сырых блокировок:
import threading
from queue import Queue
# Вместо прямой синхронизации, используем очередь
task_queue = Queue(maxsize=10)
def producer():
for i in range(5):
task_queue.put(f"Task {i}")
print(f"Produced Task {i}")
def consumer():
while True:
task = task_queue.get()
if task is None: # Сигнал завершения
break
print(f"Consumed {task}")
task_queue.task_done()
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
task_queue.put(None) # Сигнал завершения
c.join()
Способ 6: Read-Write Lock
Несколько читателей, один писатель:
import threading
class ReadWriteLock:
def __init__(self):
self.readers = 0
self.writers = 0
self.read_ready = threading.Condition(threading.Lock())
def acquire_read(self):
self.read_ready.acquire()
try:
while self.writers > 0:
self.read_ready.wait()
self.readers += 1
finally:
self.read_ready.release()
def release_read(self):
self.read_ready.acquire()
try:
self.readers -= 1
self.read_ready.notify_all()
finally:
self.read_ready.release()
def acquire_write(self):
self.read_ready.acquire()
try:
while self.writers > 0 or self.readers > 0:
self.read_ready.wait()
self.writers += 1
finally:
self.read_ready.release()
def release_write(self):
self.read_ready.acquire()
try:
self.writers -= 1
self.read_ready.notify_all()
finally:
self.read_ready.release()
Способ 7: Банковский алгоритм (Banker's Algorithm)
В системе реального времени используется для предотвращения deadlock:
def is_safe_state(available, maximum, allocated):
"""Проверяет, безопасно ли выдать ресурс"""
n = len(allocated) # количество процессов
need = [maximum[i] - allocated[i] for i in range(n)]
work = available[:]
finish = [False] * n
while not all(finish):
found = False
for i in range(n):
if not finish[i] and all(need[i][j] <= work[j] for j in range(len(work))):
work = [work[j] + allocated[i][j] for j in range(len(work))]
finish[i] = True
found = True
break
if not found:
return False # Deadlock обнаружен
return True # Безопасное состояние
Способ 8: Мониторинг и обнаружение
import threading
import time
from collections import defaultdict
class DeadlockDetector:
def __init__(self):
self.locks_held = defaultdict(list)
self.locks_waiting = defaultdict(list)
self.detector_lock = threading.Lock()
def detect_cycle(self):
"""Обнаруживает циклы в графе зависимостей блокировок"""
with self.detector_lock:
# Построение графа и проверка на циклы
# Если обнаружен цикл — deadlock
pass
detector = DeadlockDetector()
Рекомендации
На практике комбинирую:
- Порядок захвата + таймауты для критичных секций
- asyncio для новых проектов (естественно избегает deadlock)
- Queue вместо сырых блокировок
- Read-Write locks когда много читателей
- Мониторинг в production для обнаружения проблем
Лучший способ — избежать deadlock заранее, чем пытаться его разрешить!