Зачем нужен мьютекс в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Зачем нужен мьютекс в Python
Мьютекс (Mutex - Mutual Exclusion) - это механизм синхронизации, который гарантирует, что только один поток может одновременно выполнять критическую секцию кода. Это решает проблему race conditions при параллельной работе потоков.
Проблема без мьютекса: Race Condition
Когда несколько потоков обращаются к одним и тем же данным, может произойти непредсказуемое поведение.
import threading
import time
# Глобальная переменная
counter = 0
def increment_without_lock():
"""Увеличиваем счётчик без защиты"""
global counter
for _ in range(100000):
# Эта операция на самом деле состоит из трёх шагов:
# 1. Прочитать counter из памяти
# 2. Увеличить на 1
# 3. Записать обратно в память
counter += 1
# Запускаем два потока
thread1 = threading.Thread(target=increment_without_lock)
thread2 = threading.Thread(target=increment_without_lock)
start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Время: {time.time() - start:.2f}s")
print(f"Ожидаемое значение: 200000")
print(f"Фактическое значение: {counter}")
print(f"Потеряли: {200000 - counter} операций")
# Результат обычно: 150000-180000 вместо 200000!
# Потому что операции перемешались:
# Thread 1: read(100), +1, write(101)
# Thread 2: read(100), +1, write(101) <- потеряли одно увеличение!
Решение: Lock (Мьютекс)
import threading
import time
counter = 0
lock = threading.Lock() # Создаём мьютекс
def increment_with_lock():
"""Увеличиваем счётчик с защитой"""
global counter
for _ in range(100000):
with lock: # Критическая секция - только один поток
counter += 1
thread1 = threading.Thread(target=increment_with_lock)
thread2 = threading.Thread(target=increment_with_lock)
start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Время: {time.time() - start:.2f}s")
print(f"Ожидаемое значение: 200000")
print(f"Фактическое значение: {counter}")
print(f"Потеряли: {200000 - counter} операций")
# Теперь всегда: 200000
# Потому что операции выполняются строго по очереди
Как работает мьютекс
Без мьютекса: С мьютексом (Lock):
Thread 1: read(x) Thread 1: lock.acquire()
Thread 2: read(x) Thread 1: read(x)
Thread 1: +1 Thread 1: +1
Thread 2: +1 Thread 1: write(x)
Thread 1: write(x) Thread 1: lock.release()
Thread 2: write(x)
Thread 2: ждёт...
Thread 2: lock.acquire()
Thread 2: read(x)
Thread 2: +1
Thread 2: write(x)
Thread 2: lock.release()
Типы блокировок в Python
1. Lock (мьютекс)
from threading import Lock
lock = Lock()
# Способ 1: вручную
lock.acquire()
try:
# Критическая секция
counter += 1
finally:
lock.release()
# Способ 2: с контекстным менеджером (recommended)
with lock:
counter += 1
2. RLock (Reentrant Lock)
Позволяет одному потоку несколько раз захватить блокировку.
from threading import RLock
rlock = RLock()
def recursive_function():
with rlock:
print("Уровень 1")
inner_function()
def inner_function():
with rlock: # Тот же поток может захватить снова!
print("Уровень 2")
recursive_function() # Без RLock это вызвало бы deadlock
3. Semaphore
Позволяет N потокам одновременно использовать ресурс.
from threading import Semaphore
# Максимум 3 потока одновременно
semaphore = Semaphore(3)
def worker(worker_id):
with semaphore:
print(f"Worker {worker_id} работает")
time.sleep(1)
print(f"Worker {worker_id} finished")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# В любой момент только 3 worker'а работают
4. Event
Для синхронизации между потоками.
from threading import Event
event = Event() # Флаг, который может быть установлен или нет
def waiter():
print("Жду события...")
event.wait() # Блокируется, пока событие не установится
print("Событие произошло!")
def signaler():
time.sleep(2)
print("Устанавливаем событие")
event.set() # Пробуждаем все потоки, ждущие события
thread1 = threading.Thread(target=waiter)
thread2 = threading.Thread(target=signaler)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
Практический пример: безопасный счётчик
from threading import Lock
import threading
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = Lock()
def increment(self):
"""Безопасное увеличение"""
with self._lock:
self._value += 1
def decrement(self):
"""Безопасное уменьшение"""
with self._lock:
self._value -= 1
def get(self):
"""Безопасное чтение значения"""
with self._lock:
return self._value
counter = ThreadSafeCounter()
def worker(counter, num_ops):
for _ in range(num_ops):
if _ % 2 == 0:
counter.increment()
else:
counter.decrement()
threads = [
threading.Thread(target=worker, args=(counter, 10000))
for _ in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Финальное значение: {counter.get()}")
print(f"Ожидаемое значение: 0 (5 потоков * 5000 +1 и 5000 -1)")
Проблема: Deadlock
Если несколько потоков ждут друг друга, может произойти deadlock.
# ОПАСНО - deadlock
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1_func():
with lock1:
print("Thread 1 захватил lock1")
time.sleep(1)
print("Thread 1 ждёт lock2")
with lock2: # Deadlock! Thread 2 уже захватил lock2
print("Thread 1 захватил lock2")
def thread2_func():
with lock2:
print("Thread 2 захватил lock2")
time.sleep(1)
print("Thread 2 ждёт lock1")
with lock1: # Deadlock! Thread 1 уже захватил lock1
print("Thread 2 захватил lock1")
thread1 = threading.Thread(target=thread1_func)
thread2 = threading.Thread(target=thread2_func)
thread1.start()
thread2.start()
thread1.join() # Зависнет!
thread2.join()
Решение: всегда захватывать блокировки в одном порядке:
# БЕЗОПАСНО - всегда сначала lock1, потом lock2
def thread1_func():
with lock1:
with lock2:
# работа
pass
def thread2_func():
with lock1:
with lock2:
# работа
pass
Queue - встроенная потокобезопасная очередь
Для большинства случаев лучше использовать Queue вместо ручных мьютексов:
from queue import Queue
import threading
queue = Queue()
def producer():
for i in range(10):
queue.put(i) # Thread-safe!
time.sleep(0.1)
queue.put(None) # Сигнал об окончании
def consumer():
while True:
item = queue.get() # Thread-safe!
if item is None:
break
print(f"Обработка: {item}")
thread1 = threading.Thread(target=producer)
thread2 = threading.Thread(target=consumer)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
Условие (Condition Variable)
Для более сложной синхронизации:
from threading import Condition
import time
condition = Condition()
data = []
def producer():
for i in range(5):
with condition:
data.append(i)
print(f"Произвёл: {i}")
condition.notify_all() # Оповещаем потребителей
time.sleep(0.5)
def consumer():
while True:
with condition:
while not data: # Ждём, пока не будут данные
print("Потребитель ждёт...")
condition.wait() # Ждём оповещения
item = data.pop(0)
print(f"Потребил: {item}")
thread1 = threading.Thread(target=producer)
thread2 = threading.Thread(target=consumer, daemon=True)
thread1.start()
thread2.start()
thread1.join()
Когда использовать мьютекс
- ✓ Многопоточность (threading) с общей памятью
- ✓ Защита счётчиков, флагов, состояния
- ✓ Безопасный доступ к коллекциям (dict, list)
- ✓ Критические секции кода
Когда НЕ использовать мьютекс
- ✗ Асинхронность (asyncio) - не нужны, используйте другие примитивы
- ✗ Multiprocessing - используйте Queue, Pipe, Manager
- ✗ GIL уже защищает некоторые операции (но не все!)
GIL и мьютекс в Python
Важный момент: Python имеет Global Interpreter Lock (GIL), который позволяет одному потоку выполнять Python код одновременно. Но это НЕ означает, что мьютексы не нужны!
import threading
counter = 0
lock = threading.Lock()
def unsafe_increment():
global counter
temp = counter # Поток 1 может быть вытеснён здесь
temp += 1
counter = temp
# GIL не защищает от race condition в операции a += 1!
# Поэтому используем мьютекс
def safe_increment():
global counter
with lock:
counter += 1
Лучшие практики
-
Минимизируйте критические секции
# ПЛОХО - долгая блокировка with lock: result = expensive_calculation() # 10 секунд shared_list.append(result) # ХОРОШО result = expensive_calculation() # БЕЗ блокировки with lock: shared_list.append(result) -
Используйте контекстные менеджеры
with lock: # ХОРОШО # код # НЕ ДЕЛАЙТЕ lock.acquire() # код lock.release() # Может не выполниться при исключении -
Для асинхронного кода используйте asyncio.Lock
import asyncio lock = asyncio.Lock() async def critical_section(): async with lock: # код pass
Вывод
Мьютекс - это фундаментальный инструмент для:
- Защиты общих данных от race conditions
- Гарантии консистентности состояния
- Безопасной многопоточной разработки
- Координации между потоками
Без мьютексов (или других механизмов синхронизации) многопоточный код становится непредсказуемым и хрупким. Всегда защищайте общие ресурсы правильными инструментами синхронизации!