Есть ли Mutex в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Есть ли Mutex в Python
Да, есть! В Python есть несколько реализаций Mutex — это объекты синхронизации для предотвращения race conditions в многопоточном коде. Давайте разберёмся.
Что такое Mutex
Mutex (Mutual Exclusion) — это lock, который гарантирует, что только один поток может выполнять критическую секцию кода одновременно.
Без Mutex:
import threading
import time
counter = 0
def increment():
global counter
for _ in range(100000):
# Race condition! Несколько потоков пытаются incrementировать одновременно
temp = counter
temp += 1
counter = temp
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Может быть не 500000! Обычно меньше (race condition)
В Python есть несколько типов Mutex/Lock:
1. threading.Lock (самый базовый)
import threading
counter = 0
lock = threading.Lock() # Создаём Mutex
def increment():
global counter
for _ in range(100000):
with lock: # Критическая секция под защитой lock
temp = counter
temp += 1
counter = temp
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Теперь всегда 500000 ✅
Как это работает:
- Поток пытается войти в
with lock: - Если lock свободен — входит и выполняет критическую секцию
- Если lock занят другим потоком — ждёт
- Выходит из
with— lock освобождается
Ручное использование
lock = threading.Lock()
lock.acquire() # Захватить lock
try:
# Критическая секция
counter += 1
finally:
lock.release() # Освободить lock
# Или с context manager (лучше):
with lock:
counter += 1
2. threading.RLock (переиспользуемый lock)
Для случаев, когда один поток хочет захватить lock несколько раз:
import threading
rlock = threading.RLock() # Reentrant Lock
def outer():
with rlock:
print("Outer")
inner()
def inner():
with rlock: # Этот же поток может захватить lock снова
print("Inner")
outer() # Работает! С обычным Lock было бы deadlock
# С обычным Lock (deadlock):
lock = threading.Lock()
def outer_bad():
with lock:
print("Outer")
inner_bad()
def inner_bad():
with lock: # DEADLOCK! Этот же поток ждёт lock, который уже держит
print("Inner")
# outer_bad() # Зависнет!
3. threading.Semaphore (счётный lock)
Для контроля доступа к ресурсу, количество которого ограничено:
import threading
import time
# Максимум 3 потока могут одновременно использовать ресурс
semaphore = threading.Semaphore(3)
def access_resource(i):
with semaphore:
print(f"Поток {i} использует ресурс")
time.sleep(1)
print(f"Поток {i} освободил ресурс")
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# Вывод: по 3 потока одновременно (макс 3 в одно время)
4. threading.Event (сигнальный lock)
Для синхронизации между потоками (signal/wait pattern):
import threading
import time
event = threading.Event()
def waiter():
print("Ожидаю сигнала...")
event.wait() # Блокируется, пока не придёт сигнал
print("Получил сигнал! Продолжаю работу")
def signaler():
time.sleep(2)
print("Отправляю сигнал...")
event.set() # Отправляет сигнал всем ожидающим потокам
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=signaler)
t1.start()
t2.start()
t1.join()
t2.join()
5. queue.Queue (потокобезопасная очередь)
Основана на Mutex, но проще для некоторых сценариев:
import threading
from queue import Queue
q = Queue()
def producer():
for i in range(5):
q.put(f'Item {i}')
print(f'Produced: Item {i}')
def consumer():
while True:
item = q.get()
if item is None:
break
print(f'Consumed: {item}')
q.task_done()
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
q.put(None) # Сигнал для потребителя
c.join()
6. threading.Condition (condition variable)
Для более сложной синхронизации (wait for condition):
import threading
import time
condition = threading.Condition()
data = None
def producer():
global data
time.sleep(1)
with condition:
data = "Some important data"
print("Producer: Данные готовы")
condition.notify() # Уведомляем ожидающие потоки
def consumer():
with condition:
condition.wait() # Ждём уведомления
print(f"Consumer: Получил данные: {data}")
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
c.start() # Consumer ждёт
p.start() # Producer отправляет сигнал
p.join()
c.join()
GIL (Global Interpreter Lock)
⚠️ Важно в Python: GIL — это механизм в CPython, который гарантирует, что только один поток выполняет Python bytecode одновременно.
Это означает:
import threading
import time
def cpu_bound():
# GIL гарантирует, что потоки НЕ выполняются параллельно
# Это просто чередование (context switching)
total = 0
for i in range(100000000):
total += i
return total
t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=cpu_bound)
start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print(f"С threading: {end - start:.2f}s")
# Но без threading (последовательно):
start = time.time()
cpu_bound()
cpu_bound()
end = time.time()
print(f"Без threading: {end - start:.2f}s")
# Результаты ОДИНАКОВЫЕ! GIL мешает параллельности
Решение: используй multiprocessing для CPU-bound задач
import multiprocessing
import time
def cpu_bound():
total = 0
for i in range(100000000):
total += i
return total
start = time.time()
with multiprocessing.Pool(2) as pool:
pool.map(lambda x: cpu_bound(), [1, 2])
end = time.time()
print(f"С multiprocessing: {end - start:.2f}s") # НАМНОГО быстрее!
Сравнение синхронизаторов
| Тип | Назначение | Пример |
|---|---|---|
| Lock | Базовый Mutex | Защита счётчика |
| RLock | Переиспользуемый Lock | Рекурсивные функции |
| Semaphore | Ограничение кол-ва потоков | Пул worker'ов |
| Event | Signal/Wait | Ожидание события |
| Condition | Wait for condition | Сложная синхронизация |
| Queue | Потокобезопасная очередь | Producer/Consumer |
Когда использовать threading vs multiprocessing
# I/O-bound (сеть, файлы) → threading ✅
import threading
import requests
def fetch_url(url):
response = requests.get(url) # GIL освобождается во время I/O
return response.text
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
# Потоки выполняются параллельно (GIL освобождается)
# CPU-bound (вычисления) → multiprocessing ✅
import multiprocessing
def calculate(n):
return sum(i*i for i in range(n))
with multiprocessing.Pool() as pool:
results = pool.map(calculate, [10000000, 10000000])
# Процессы выполняются по-настоящему параллельно (нет GIL)
Вывод
Да, в Python есть Mutex и много примитивов синхронизации (threading.Lock, RLock, Semaphore, Event, Condition, Queue). Используй их для защиты общих данных в многопоточном коде. Но помни о GIL — для CPU-bound задач лучше multiprocessing, для I/O-bound — threading или asyncio.