← Назад к вопросам

Что такое состояние взаимоблокировки?

2.0 Middle🔥 11 комментариев
#Soft Skills#Безопасность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Состояние взаимоблокировки (Deadlock)

Взаимоблокировка (Deadlock) — это состояние, при котором два или более процесса (или потока) ждут друг друга, чтобы освободить ресурсы, которые они удерживают, и таким образом ни один из них не может продолжить выполнение. Это приводит к полной остановке всех затронутых процессов. Взаимоблокировка — один из самых сложных багов в многопоточном и асинхронном программировании.

Классические условия возникновения deadlock

Для возникновения взаимоблокировки необходимо выполнение всех четырех условий одновременно:

  1. Взаимное исключение — ресурс может использоваться только одним процессом одновременно
  2. Удержание и ожидание — процесс удерживает ресурсы и ждет дополнительных
  3. Отсутствие вытеснения — ресурсы не могут быть отняты у процесса
  4. Циклическое ожидание — существует циклическая цепь процессов, ждущих друг друга

Классический пример с потоками

import threading
import time

class Account:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()

account_a = Account(1000)
account_b = Account(1000)

def transfer_a_to_b():
    """Перевод денег со счета A на счет B"""
    for _ in range(100):
        with account_a.lock:  # Захватываем lock A
            time.sleep(0.001)  # Имитация обработки
            with account_b.lock:  # Ждем lock B
                account_a.balance -= 10
                account_b.balance += 10
        time.sleep(0.001)

def transfer_b_to_a():
    """Перевод денег со счета B на счет A"""
    for _ in range(100):
        with account_b.lock:  # Захватываем lock B
            time.sleep(0.001)  # Имитация обработки
            with account_a.lock:  # Ждем lock A
                account_b.balance -= 10
                account_a.balance += 10
        time.sleep(0.001)

# ОПАСНО! Может произойти deadlock
# Thread 1 удерживает account_a.lock и ждет account_b.lock
# Thread 2 удерживает account_b.lock и ждет account_a.lock

Решение 1: Упорядочить захват locks

import threading
import time

class Account:
    _counter = 0
    _counter_lock = threading.Lock()
    
    def __init__(self, balance):
        with Account._counter_lock:
            Account._counter += 1
            self.id = Account._counter
        self.balance = balance
        self.lock = threading.Lock()

account_a = Account(1000)
account_b = Account(1000)

def transfer_safe(from_acc, to_acc, amount):
    """Безопасный перевод с упорядоченным захватом locks"""
    # Всегда захватываем locks в одинаковом порядке (по ID)
    if from_acc.id < to_acc.id:
        first, second = from_acc, to_acc
    else:
        first, second = to_acc, from_acc
    
    with first.lock:
        time.sleep(0.001)
        with second.lock:
            if from_acc == first:
                from_acc.balance -= amount
                to_acc.balance += amount
            else:
                to_acc.balance += amount
                from_acc.balance -= amount

# Теперь safe от deadlock
t1 = threading.Thread(target=transfer_safe, args=(account_a, account_b, 10))
t2 = threading.Thread(target=transfer_safe, args=(account_b, account_a, 10))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Account A: {account_a.balance}, Account B: {account_b.balance}")

Решение 2: Использование timeout

import threading
import time

def transfer_with_timeout(from_acc, to_acc, amount, timeout=1):
    """Перевод с timeout для избежания бесконечного ожидания"""
    while True:
        acquired_from = from_acc.lock.acquire(timeout=timeout)
        if not acquired_from:
            continue  # Повторить если не удалось захватить
        
        try:
            acquired_to = to_acc.lock.acquire(timeout=timeout)
            if not acquired_to:
                # Не смогли захватить второй lock, освобождаем первый
                from_acc.lock.release()
                time.sleep(0.01)  # Небольшая задержка
                continue
            
            try:
                # Обновляем балансы
                from_acc.balance -= amount
                to_acc.balance += amount
                break  # Успешно завершили
            finally:
                to_acc.lock.release()
        finally:
            from_acc.lock.release()

Решение 3: Использование RLock (Reentrant Lock)

import threading

class BankAccount:
    def __init__(self, name, balance):
        self.name = name
        self.balance = balance
        self.lock = threading.RLock()  # Один поток может захватить несколько раз
    
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
            print(f"{self.name}: +{amount}, баланс = {self.balance}")
    
    def withdraw(self, amount):
        with self.lock:
            self.balance -= amount
            print(f"{self.name}: -{amount}, баланс = {self.balance}")
    
    def transfer(self, other, amount):
        with self.lock:
            with other.lock:
                self.withdraw(amount)
                other.deposit(amount)

account1 = BankAccount("Account 1", 1000)
account2 = BankAccount("Account 2", 500)
account1.transfer(account2, 100)

Решение 4: Использование contextvars для асинхронного кода

import asyncio
from asyncio import Lock

class AsyncAccount:
    def __init__(self, name, balance):
        self.name = name
        self.balance = balance
        self.lock = Lock()

async def async_transfer(from_acc, to_acc, amount):
    """Безопасный асинхронный перевод"""
    # Упорядочиваем захват locks по имени (для детерминизма)
    if from_acc.name < to_acc.name:
        first, second = from_acc, to_acc
    else:
        first, second = to_acc, from_acc
    
    async with first.lock:
        await asyncio.sleep(0.01)  # Имитация асинхронной работы
        async with second.lock:
            from_acc.balance -= amount
            to_acc.balance += amount
            print(f"{from_acc.name} -> {to_acc.name}: {amount}")

async def main():
    acc1 = AsyncAccount("Account1", 1000)
    acc2 = AsyncAccount("Account2", 500)
    
    await asyncio.gather(
        async_transfer(acc1, acc2, 100),
        async_transfer(acc2, acc1, 50)
    )
    
    print(f"Account1: {acc1.balance}, Account2: {acc2.balance}")

asyncio.run(main())

Обнаружение deadlock

import threading
import sys

def detect_deadlock():
    """Обнаружить потенциальные deadlock ситуации"""
    for thread_id, frame in sys._current_frames().items():
        print(f"\nThread {thread_id}:")
        print(''.join(traceback.format_stack(frame)))

# В production используйте инструменты вроде:
# - ThreadSanitizer
# - Thread Dump анализ
# - Prometheus метрики для мониторинга зависаний

Стратегии избежания deadlock

  1. Упорядочение ресурсов — всегда захватываем ресурсы в одном порядке
  2. Timeout — устанавливаем максимальное время ожидания
  3. Максимум один lock — избегаем вложенных locks
  4. Atomic операции — используем атомарные операции где возможно
  5. Асинхронность — используем async/await вместо threads

Пример с SQLAlchemy

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
import time

# SELECT FOR UPDATE SKIP LOCKED помогает избежать deadlock
session = sessionmaker(bind=engine)()

def safe_update(account_id):
    """Безопасное обновление с SKIP LOCKED"""
    for attempt in range(3):
        try:
            # SELECT FOR UPDATE SKIP LOCKED пропускает заблокированные строки
            account = (
                session.query(Account)
                .with_for_update(skip_locked=True)
                .filter(Account.id == account_id)
                .first()
            )
            
            if account:
                account.balance += 100
                session.commit()
                break
        except OperationalError:
            session.rollback()
            time.sleep(0.1 * (2 ** attempt))  # Exponential backoff

Мониторинг в production

import logging
import threading
import time

logger = logging.getLogger(__name__)

def monitor_locks(locks_dict, timeout=10):
    """Мониторить время ожидания locks"""
    for lock_name, lock_obj in locks_dict.items():
        start = time.time()
        acquired = lock_obj.acquire(timeout=timeout)
        elapsed = time.time() - start
        
        if elapsed > timeout * 0.8:
            logger.warning(f"Lock {lock_name} took {elapsed:.2f}s to acquire")
        
        if acquired:
            lock_obj.release()

Взаимоблокировка — это серьезная проблема в многопоточном программировании. Правильное понимание условий возникновения deadlock и применение стратегий их избежания критично для создания надежных высоконагруженных систем.

Что такое состояние взаимоблокировки? | PrepBro