← Назад к вопросам

Какие знаешь способы разрешения Deadlock?

3.0 Senior🔥 191 комментариев
#Асинхронность и многопоточность#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Способы разрешения Deadlock (Взаимная блокировка)

Deadlock возникает, когда два или более процесса/потока взаимно блокируют друг друга, ожидая ресурсы, которыми друг друга владеют. Рассмотрю методы предотвращения и разрешения.

Условия возникновения Deadlock (Coffman conditions)

Все четыре условия должны быть выполнены одновременно:

  1. Взаимное исключение — ресурс используется одним процессом
  2. Удержание и ожидание — процесс удерживает ресурс и ждет другого
  3. Невозможность отобрать — ресурс нельзя насильственно отобрать
  4. Циклическое ожидание — цепочка процессов ждут друг друга

Способ 1: Порядок захвата ресурсов

Префиксирование — все потоки захватывают ресурсы в одном порядке:

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def thread_1():
    with lock_a:  # Всегда берём A первым
        time.sleep(0.1)
        with lock_b:
            print("Thread 1 OK")

def thread_2():
    with lock_a:  # Также A первым, не B первым!
        time.sleep(0.1)
        with lock_b:
            print("Thread 2 OK")

t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start()
t2.start()
t1.join()
t2.join()

Способ 2: Таймауты

Захватываем ресурсы с таймаутом и отступаем при конфликте:

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def safe_transaction():
    max_retries = 3
    for attempt in range(max_retries):
        if lock_a.acquire(timeout=1):
            try:
                if lock_b.acquire(timeout=1):
                    try:
                        print(f"Success on attempt {attempt + 1}")
                        return True
                    finally:
                        lock_b.release()
                else:
                    print(f"Could not acquire lock_b, retrying...")
                    continue
            finally:
                lock_a.release()
        else:
            print(f"Could not acquire lock_a, retrying...")
            time.sleep(0.1 * (attempt + 1))  # Exponential backoff
    
    return False

threads = []
for _ in range(5):
    t = threading.Thread(target=safe_transaction)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Способ 3: Асинхронный подход (asyncio)

Асинхронность предотвращает deadlock по природе:

import asyncio

lock_a = asyncio.Lock()
lock_b = asyncio.Lock()

async def safe_operation():
    async with lock_a:
        await asyncio.sleep(0.1)
        async with lock_b:
            print("Operation successful")
            return True

async def main():
    tasks = [safe_operation() for _ in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"Completed: {sum(results)} operations")

asyncio.run(main())

Способ 4: Context Manager с гарантированным освобождением

import threading
from contextlib import contextmanager

resources = {}
resource_lock = threading.Lock()

@contextmanager
def acquire_resource(resource_id, timeout=1):
    """Контекстный менеджер для безопасного захвата ресурса"""
    acquired = False
    try:
        with resource_lock:
            resource = resources.get(resource_id)
            if resource is None:
                resource = threading.Lock()
                resources[resource_id] = resource
        
        if resource.acquire(timeout=timeout):
            acquired = True
            yield resource
        else:
            raise TimeoutError(f"Could not acquire resource {resource_id}")
    finally:
        if acquired:
            resource.release()

def safe_access(resource_id):
    try:
        with acquire_resource(resource_id):
            print(f"Accessed resource {resource_id}")
    except TimeoutError as e:
        print(f"Failed: {e}")

Способ 5: Thread-safe очередь

Используем queue вместо сырых блокировок:

import threading
from queue import Queue

# Вместо прямой синхронизации, используем очередь
task_queue = Queue(maxsize=10)

def producer():
    for i in range(5):
        task_queue.put(f"Task {i}")
        print(f"Produced Task {i}")

def consumer():
    while True:
        task = task_queue.get()
        if task is None:  # Сигнал завершения
            break
        print(f"Consumed {task}")
        task_queue.task_done()

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()

task_queue.put(None)  # Сигнал завершения
c.join()

Способ 6: Read-Write Lock

Несколько читателей, один писатель:

import threading

class ReadWriteLock:
    def __init__(self):
        self.readers = 0
        self.writers = 0
        self.read_ready = threading.Condition(threading.Lock())
    
    def acquire_read(self):
        self.read_ready.acquire()
        try:
            while self.writers > 0:
                self.read_ready.wait()
            self.readers += 1
        finally:
            self.read_ready.release()
    
    def release_read(self):
        self.read_ready.acquire()
        try:
            self.readers -= 1
            self.read_ready.notify_all()
        finally:
            self.read_ready.release()
    
    def acquire_write(self):
        self.read_ready.acquire()
        try:
            while self.writers > 0 or self.readers > 0:
                self.read_ready.wait()
            self.writers += 1
        finally:
            self.read_ready.release()
    
    def release_write(self):
        self.read_ready.acquire()
        try:
            self.writers -= 1
            self.read_ready.notify_all()
        finally:
            self.read_ready.release()

Способ 7: Банковский алгоритм (Banker's Algorithm)

В системе реального времени используется для предотвращения deadlock:

def is_safe_state(available, maximum, allocated):
    """Проверяет, безопасно ли выдать ресурс"""
    n = len(allocated)  # количество процессов
    need = [maximum[i] - allocated[i] for i in range(n)]
    work = available[:]
    finish = [False] * n
    
    while not all(finish):
        found = False
        for i in range(n):
            if not finish[i] and all(need[i][j] <= work[j] for j in range(len(work))):
                work = [work[j] + allocated[i][j] for j in range(len(work))]
                finish[i] = True
                found = True
                break
        
        if not found:
            return False  # Deadlock обнаружен
    
    return True  # Безопасное состояние

Способ 8: Мониторинг и обнаружение

import threading
import time
from collections import defaultdict

class DeadlockDetector:
    def __init__(self):
        self.locks_held = defaultdict(list)
        self.locks_waiting = defaultdict(list)
        self.detector_lock = threading.Lock()
    
    def detect_cycle(self):
        """Обнаруживает циклы в графе зависимостей блокировок"""
        with self.detector_lock:
            # Построение графа и проверка на циклы
            # Если обнаружен цикл — deadlock
            pass

detector = DeadlockDetector()

Рекомендации

На практике комбинирую:

  • Порядок захвата + таймауты для критичных секций
  • asyncio для новых проектов (естественно избегает deadlock)
  • Queue вместо сырых блокировок
  • Read-Write locks когда много читателей
  • Мониторинг в production для обнаружения проблем

Лучший способ — избежать deadlock заранее, чем пытаться его разрешить!

Какие знаешь способы разрешения Deadlock? | PrepBro