← Назад к вопросам

Есть ли Mutex в Python?

1.7 Middle🔥 141 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Есть ли Mutex в Python

Да, есть! В Python есть несколько реализаций Mutex — это объекты синхронизации для предотвращения race conditions в многопоточном коде. Давайте разберёмся.

Что такое Mutex

Mutex (Mutual Exclusion) — это lock, который гарантирует, что только один поток может выполнять критическую секцию кода одновременно.

Без Mutex:

import threading
import time

counter = 0

def increment():
    global counter
    for _ in range(100000):
        # Race condition! Несколько потоков пытаются incrementировать одновременно
        temp = counter
        temp += 1
        counter = temp

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Может быть не 500000! Обычно меньше (race condition)

В Python есть несколько типов Mutex/Lock:

1. threading.Lock (самый базовый)

import threading

counter = 0
lock = threading.Lock()  # Создаём Mutex

def increment():
    global counter
    for _ in range(100000):
        with lock:  # Критическая секция под защитой lock
            temp = counter
            temp += 1
            counter = temp

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Теперь всегда 500000 ✅

Как это работает:

  1. Поток пытается войти в with lock:
  2. Если lock свободен — входит и выполняет критическую секцию
  3. Если lock занят другим потоком — ждёт
  4. Выходит из with — lock освобождается

Ручное использование

lock = threading.Lock()

lock.acquire()  # Захватить lock
try:
    # Критическая секция
    counter += 1
finally:
    lock.release()  # Освободить lock

# Или с context manager (лучше):
with lock:
    counter += 1

2. threading.RLock (переиспользуемый lock)

Для случаев, когда один поток хочет захватить lock несколько раз:

import threading

rlock = threading.RLock()  # Reentrant Lock

def outer():
    with rlock:
        print("Outer")
        inner()

def inner():
    with rlock:  # Этот же поток может захватить lock снова
        print("Inner")

outer()  # Работает! С обычным Lock было бы deadlock

# С обычным Lock (deadlock):
lock = threading.Lock()

def outer_bad():
    with lock:
        print("Outer")
        inner_bad()

def inner_bad():
    with lock:  # DEADLOCK! Этот же поток ждёт lock, который уже держит
        print("Inner")

# outer_bad()  # Зависнет!

3. threading.Semaphore (счётный lock)

Для контроля доступа к ресурсу, количество которого ограничено:

import threading
import time

# Максимум 3 потока могут одновременно использовать ресурс
semaphore = threading.Semaphore(3)

def access_resource(i):
    with semaphore:
        print(f"Поток {i} использует ресурс")
        time.sleep(1)
        print(f"Поток {i} освободил ресурс")

threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Вывод: по 3 потока одновременно (макс 3 в одно время)

4. threading.Event (сигнальный lock)

Для синхронизации между потоками (signal/wait pattern):

import threading
import time

event = threading.Event()

def waiter():
    print("Ожидаю сигнала...")
    event.wait()  # Блокируется, пока не придёт сигнал
    print("Получил сигнал! Продолжаю работу")

def signaler():
    time.sleep(2)
    print("Отправляю сигнал...")
    event.set()  # Отправляет сигнал всем ожидающим потокам

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=signaler)

t1.start()
t2.start()

t1.join()
t2.join()

5. queue.Queue (потокобезопасная очередь)

Основана на Mutex, но проще для некоторых сценариев:

import threading
from queue import Queue

q = Queue()

def producer():
    for i in range(5):
        q.put(f'Item {i}')
        print(f'Produced: Item {i}')

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f'Consumed: {item}')
        q.task_done()

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)

p.start()
c.start()

p.join()
q.put(None)  # Сигнал для потребителя
c.join()

6. threading.Condition (condition variable)

Для более сложной синхронизации (wait for condition):

import threading
import time

condition = threading.Condition()
data = None

def producer():
    global data
    time.sleep(1)
    with condition:
        data = "Some important data"
        print("Producer: Данные готовы")
        condition.notify()  # Уведомляем ожидающие потоки

def consumer():
    with condition:
        condition.wait()  # Ждём уведомления
        print(f"Consumer: Получил данные: {data}")

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)

c.start()  # Consumer ждёт
p.start()  # Producer отправляет сигнал

p.join()
c.join()

GIL (Global Interpreter Lock)

⚠️ Важно в Python: GIL — это механизм в CPython, который гарантирует, что только один поток выполняет Python bytecode одновременно.

Это означает:

import threading
import time

def cpu_bound():
    # GIL гарантирует, что потоки НЕ выполняются параллельно
    # Это просто чередование (context switching)
    total = 0
    for i in range(100000000):
        total += i
    return total

t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=cpu_bound)

start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()

print(f"С threading: {end - start:.2f}s")

# Но без threading (последовательно):
start = time.time()
cpu_bound()
cpu_bound()
end = time.time()

print(f"Без threading: {end - start:.2f}s")

# Результаты ОДИНАКОВЫЕ! GIL мешает параллельности

Решение: используй multiprocessing для CPU-bound задач

import multiprocessing
import time

def cpu_bound():
    total = 0
    for i in range(100000000):
        total += i
    return total

start = time.time()
with multiprocessing.Pool(2) as pool:
    pool.map(lambda x: cpu_bound(), [1, 2])
end = time.time()

print(f"С multiprocessing: {end - start:.2f}s")  # НАМНОГО быстрее!

Сравнение синхронизаторов

ТипНазначениеПример
LockБазовый MutexЗащита счётчика
RLockПереиспользуемый LockРекурсивные функции
SemaphoreОграничение кол-ва потоковПул worker'ов
EventSignal/WaitОжидание события
ConditionWait for conditionСложная синхронизация
QueueПотокобезопасная очередьProducer/Consumer

Когда использовать threading vs multiprocessing

# I/O-bound (сеть, файлы) → threading ✅
import threading
import requests

def fetch_url(url):
    response = requests.get(url)  # GIL освобождается во время I/O
    return response.text

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
# Потоки выполняются параллельно (GIL освобождается)

# CPU-bound (вычисления) → multiprocessing ✅
import multiprocessing

def calculate(n):
    return sum(i*i for i in range(n))

with multiprocessing.Pool() as pool:
    results = pool.map(calculate, [10000000, 10000000])
# Процессы выполняются по-настоящему параллельно (нет GIL)

Вывод

Да, в Python есть Mutex и много примитивов синхронизации (threading.Lock, RLock, Semaphore, Event, Condition, Queue). Используй их для защиты общих данных в многопоточном коде. Но помни о GIL — для CPU-bound задач лучше multiprocessing, для I/O-bound — threading или asyncio.