← Назад к вопросам

Зачем нужен мьютекс в Python?

2.2 Middle🔥 211 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Зачем нужен мьютекс в Python

Мьютекс (Mutex - Mutual Exclusion) - это механизм синхронизации, который гарантирует, что только один поток может одновременно выполнять критическую секцию кода. Это решает проблему race conditions при параллельной работе потоков.

Проблема без мьютекса: Race Condition

Когда несколько потоков обращаются к одним и тем же данным, может произойти непредсказуемое поведение.

import threading
import time

# Глобальная переменная
counter = 0

def increment_without_lock():
    """Увеличиваем счётчик без защиты"""
    global counter
    for _ in range(100000):
        # Эта операция на самом деле состоит из трёх шагов:
        # 1. Прочитать counter из памяти
        # 2. Увеличить на 1
        # 3. Записать обратно в память
        counter += 1

# Запускаем два потока
thread1 = threading.Thread(target=increment_without_lock)
thread2 = threading.Thread(target=increment_without_lock)

start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()

print(f"Время: {time.time() - start:.2f}s")
print(f"Ожидаемое значение: 200000")
print(f"Фактическое значение: {counter}")
print(f"Потеряли: {200000 - counter} операций")

# Результат обычно: 150000-180000 вместо 200000!
# Потому что операции перемешались:
# Thread 1: read(100), +1, write(101)
# Thread 2: read(100), +1, write(101) <- потеряли одно увеличение!

Решение: Lock (Мьютекс)

import threading
import time

counter = 0
lock = threading.Lock()  # Создаём мьютекс

def increment_with_lock():
    """Увеличиваем счётчик с защитой"""
    global counter
    for _ in range(100000):
        with lock:  # Критическая секция - только один поток
            counter += 1

thread1 = threading.Thread(target=increment_with_lock)
thread2 = threading.Thread(target=increment_with_lock)

start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()

print(f"Время: {time.time() - start:.2f}s")
print(f"Ожидаемое значение: 200000")
print(f"Фактическое значение: {counter}")
print(f"Потеряли: {200000 - counter} операций")

# Теперь всегда: 200000
# Потому что операции выполняются строго по очереди

Как работает мьютекс

Без мьютекса:           С мьютексом (Lock):

Thread 1: read(x)       Thread 1: lock.acquire()
Thread 2: read(x)       Thread 1: read(x)
Thread 1: +1            Thread 1: +1
Thread 2: +1            Thread 1: write(x)
Thread 1: write(x)      Thread 1: lock.release()
Thread 2: write(x)      
                        Thread 2: ждёт...
                        Thread 2: lock.acquire()
                        Thread 2: read(x)
                        Thread 2: +1
                        Thread 2: write(x)
                        Thread 2: lock.release()

Типы блокировок в Python

1. Lock (мьютекс)

from threading import Lock

lock = Lock()

# Способ 1: вручную
lock.acquire()
try:
    # Критическая секция
    counter += 1
finally:
    lock.release()

# Способ 2: с контекстным менеджером (recommended)
with lock:
    counter += 1

2. RLock (Reentrant Lock)

Позволяет одному потоку несколько раз захватить блокировку.

from threading import RLock

rlock = RLock()

def recursive_function():
    with rlock:
        print("Уровень 1")
        inner_function()

def inner_function():
    with rlock:  # Тот же поток может захватить снова!
        print("Уровень 2")

recursive_function()  # Без RLock это вызвало бы deadlock

3. Semaphore

Позволяет N потокам одновременно использовать ресурс.

from threading import Semaphore

# Максимум 3 потока одновременно
semaphore = Semaphore(3)

def worker(worker_id):
    with semaphore:
        print(f"Worker {worker_id} работает")
        time.sleep(1)
        print(f"Worker {worker_id} finished")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# В любой момент только 3 worker'а работают

4. Event

Для синхронизации между потоками.

from threading import Event

event = Event()  # Флаг, который может быть установлен или нет

def waiter():
    print("Жду события...")
    event.wait()  # Блокируется, пока событие не установится
    print("Событие произошло!")

def signaler():
    time.sleep(2)
    print("Устанавливаем событие")
    event.set()  # Пробуждаем все потоки, ждущие события

thread1 = threading.Thread(target=waiter)
thread2 = threading.Thread(target=signaler)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Практический пример: безопасный счётчик

from threading import Lock
import threading

class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = Lock()
    
    def increment(self):
        """Безопасное увеличение"""
        with self._lock:
            self._value += 1
    
    def decrement(self):
        """Безопасное уменьшение"""
        with self._lock:
            self._value -= 1
    
    def get(self):
        """Безопасное чтение значения"""
        with self._lock:
            return self._value

counter = ThreadSafeCounter()

def worker(counter, num_ops):
    for _ in range(num_ops):
        if _ % 2 == 0:
            counter.increment()
        else:
            counter.decrement()

threads = [
    threading.Thread(target=worker, args=(counter, 10000))
    for _ in range(5)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Финальное значение: {counter.get()}")
print(f"Ожидаемое значение: 0 (5 потоков * 5000 +1 и 5000 -1)")

Проблема: Deadlock

Если несколько потоков ждут друг друга, может произойти deadlock.

# ОПАСНО - deadlock
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_func():
    with lock1:
        print("Thread 1 захватил lock1")
        time.sleep(1)
        print("Thread 1 ждёт lock2")
        with lock2:  # Deadlock! Thread 2 уже захватил lock2
            print("Thread 1 захватил lock2")

def thread2_func():
    with lock2:
        print("Thread 2 захватил lock2")
        time.sleep(1)
        print("Thread 2 ждёт lock1")
        with lock1:  # Deadlock! Thread 1 уже захватил lock1
            print("Thread 2 захватил lock1")

thread1 = threading.Thread(target=thread1_func)
thread2 = threading.Thread(target=thread2_func)

thread1.start()
thread2.start()

thread1.join()  # Зависнет!
thread2.join()

Решение: всегда захватывать блокировки в одном порядке:

# БЕЗОПАСНО - всегда сначала lock1, потом lock2
def thread1_func():
    with lock1:
        with lock2:
            # работа
            pass

def thread2_func():
    with lock1:
        with lock2:
            # работа
            pass

Queue - встроенная потокобезопасная очередь

Для большинства случаев лучше использовать Queue вместо ручных мьютексов:

from queue import Queue
import threading

queue = Queue()

def producer():
    for i in range(10):
        queue.put(i)  # Thread-safe!
        time.sleep(0.1)
    queue.put(None)  # Сигнал об окончании

def consumer():
    while True:
        item = queue.get()  # Thread-safe!
        if item is None:
            break
        print(f"Обработка: {item}")

thread1 = threading.Thread(target=producer)
thread2 = threading.Thread(target=consumer)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Условие (Condition Variable)

Для более сложной синхронизации:

from threading import Condition
import time

condition = Condition()
data = []

def producer():
    for i in range(5):
        with condition:
            data.append(i)
            print(f"Произвёл: {i}")
            condition.notify_all()  # Оповещаем потребителей
        time.sleep(0.5)

def consumer():
    while True:
        with condition:
            while not data:  # Ждём, пока не будут данные
                print("Потребитель ждёт...")
                condition.wait()  # Ждём оповещения
            
            item = data.pop(0)
            print(f"Потребил: {item}")

thread1 = threading.Thread(target=producer)
thread2 = threading.Thread(target=consumer, daemon=True)

thread1.start()
thread2.start()
thread1.join()

Когда использовать мьютекс

  • Многопоточность (threading) с общей памятью
  • ✓ Защита счётчиков, флагов, состояния
  • ✓ Безопасный доступ к коллекциям (dict, list)
  • ✓ Критические секции кода

Когда НЕ использовать мьютекс

  • Асинхронность (asyncio) - не нужны, используйте другие примитивы
  • Multiprocessing - используйте Queue, Pipe, Manager
  • ✗ GIL уже защищает некоторые операции (но не все!)

GIL и мьютекс в Python

Важный момент: Python имеет Global Interpreter Lock (GIL), который позволяет одному потоку выполнять Python код одновременно. Но это НЕ означает, что мьютексы не нужны!

import threading

counter = 0
lock = threading.Lock()

def unsafe_increment():
    global counter
    temp = counter  # Поток 1 может быть вытеснён здесь
    temp += 1
    counter = temp

# GIL не защищает от race condition в операции a += 1!
# Поэтому используем мьютекс

def safe_increment():
    global counter
    with lock:
        counter += 1

Лучшие практики

  1. Минимизируйте критические секции

    # ПЛОХО - долгая блокировка
    with lock:
        result = expensive_calculation()  # 10 секунд
        shared_list.append(result)
    
    # ХОРОШО
    result = expensive_calculation()  # БЕЗ блокировки
    with lock:
        shared_list.append(result)
    
  2. Используйте контекстные менеджеры

    with lock:  # ХОРОШО
        # код
    
    # НЕ ДЕЛАЙТЕ
    lock.acquire()
    # код
    lock.release()  # Может не выполниться при исключении
    
  3. Для асинхронного кода используйте asyncio.Lock

    import asyncio
    
    lock = asyncio.Lock()
    
    async def critical_section():
        async with lock:
            # код
            pass
    

Вывод

Мьютекс - это фундаментальный инструмент для:

  • Защиты общих данных от race conditions
  • Гарантии консистентности состояния
  • Безопасной многопоточной разработки
  • Координации между потоками

Без мьютексов (или других механизмов синхронизации) многопоточный код становится непредсказуемым и хрупким. Всегда защищайте общие ресурсы правильными инструментами синхронизации!

Зачем нужен мьютекс в Python? | PrepBro