Зачем нужен объект синхронизации в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Объекты синхронизации в Python
Отличный вопрос про многопоточность! Объекты синхронизации — это критический инструмент для работы с параллельными потоками. Давайте разберёмся, почему они нужны.
Проблема: Race Condition
Представь, что два потока одновременно изменяют один счётчик:
from threading import Thread
counter = 0
def increment():
global counter
for _ in range(1000000):
counter += 1
# Запускаем два потока
thread1 = Thread(target=increment)
thread2 = Thread(target=increment)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Counter: {counter}") # Ожидаем 2000000
# На самом деле: случайное число (например, 1234567)!
Почему? Операция counter += 1 на самом деле состоит из трёх шагов:
1. Прочитать counter из памяти (читай 10)
2. Увеличить на 1 (вычисли 11)
3. Записать обратно в память
Если оба потока делают это одновременно:
Тред 1: Читай (10) → Увеличь (11) → ...
Тред 2: Читай (10) → Увеличь (11) → Запиши (11) # Ошибка!
Тред 1: Запиши (11) # Вместо 12!
Решение: Lock (Замок)
Lock гарантирует, что только один поток может исполнять код одновременно:
from threading import Thread, Lock
counter = 0
lock = Lock() # ← Создаём замок
def increment():
global counter
for _ in range(1000000):
with lock: # ← Только один поток может войти сюда
counter += 1
thread1 = Thread(target=increment)
thread2 = Thread(target=increment)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Counter: {counter}") # 2000000 — правильно!
Основные объекты синхронизации
1. Lock (Mutex)
Самый простой механизм — бинарный замок (взял/отпустил):
from threading import Thread, Lock
resource = "Критический ресурс"
lock = Lock()
def use_resource():
with lock: # Экранированный доступ
print(f"Accessing: {resource}")
time.sleep(0.1) # Имитируем работу
print("Done")
threads = [Thread(target=use_resource) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# Вывод:
# Accessing: ...
# Done
# Accessing: ...
# Done
# Accessing: ...
# Done
# (Все выполняются последовательно, не параллельно)
2. RLock (Reentrant Lock)
Обычный Lock нельзя захватить дважды из одного потока. RLock позволяет:
from threading import RLock
lock = RLock()
def function_a():
with lock:
print("In A")
function_b() # ← Может захватить тот же lock!
def function_b():
with lock:
print("In B")
function_a() # Работает, обычный Lock вызвал бы deadlock
3. Semaphore
Позволяет N потокам одновременно доступить к ресурсу:
from threading import Thread, Semaphore
import time
# Максимум 3 потока одновременно (например, лимит соединений БД)
semaphore = Semaphore(3)
def access_database():
with semaphore: # Автоматически ждёт если уже 3 потока
print(f"Worker {threading.current_thread().name} accessing DB")
time.sleep(1) # Имитируем работу
print(f"Worker {threading.current_thread().name} done")
threads = [Thread(target=access_database, name=f"T{i}") for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# Вывод:
# Worker T0 accessing DB
# Worker T1 accessing DB
# Worker T2 accessing DB
# (Ждут T3-T9)
# Worker T0 done
# Worker T3 accessing DB
# ...
4. Event
Позволяет одному потоку сигнализировать другим потокам о событии:
from threading import Thread, Event
import time
event = Event() # Флаг "событие произошло"
def worker():
print("Worker waiting for event...")
event.wait() # ← Ждёт пока не произойдёт событие
print("Event happened, worker continues!")
def signaler():
time.sleep(2)
print("Signaling event...")
event.set() # ← Устанавливаем флаг
worker_thread = Thread(target=worker)
signaler_thread = Thread(target=signaler)
worker_thread.start()
signaler_thread.start()
worker_thread.join()
signaler_thread.join()
# Вывод:
# Worker waiting for event...
# (2 секунды)
# Signaling event...
# Event happened, worker continues!
5. Condition Variable
Комбинация Lock + Event. Позволяет потокам ждать условия:
from threading import Thread, Condition
import time
data_ready = False
condition = Condition() # Lock + Event в одном
def producer():
global data_ready
with condition:
print("Producer: preparing data...")
time.sleep(1)
data_ready = True
print("Producer: data ready!")
condition.notify_all() # Сигнализируем потребителям
def consumer(consumer_id):
with condition:
while not data_ready: # Ждём условия
print(f"Consumer {consumer_id}: waiting...")
condition.wait() # ← Освобождает lock и ждёт сигнала
print(f"Consumer {consumer_id}: got data!")
producer_thread = Thread(target=producer)
consumers = [Thread(target=consumer, args=(i,)) for i in range(3)]
for t in consumers:
t.start()
producer_thread.start()
producer_thread.join()
for t in consumers:
t.join()
# Вывод:
# Consumer 0: waiting...
# Consumer 1: waiting...
# Consumer 2: waiting...
# Producer: preparing data...
# Producer: data ready!
# Consumer 0: got data!
# Consumer 1: got data!
# Consumer 2: got data!
6. Queue (Очередь потокобезопасная)
Большинство случаев используй Queue вместо ручной синхронизации:
from threading import Thread
from queue import Queue
import time
queue = Queue(maxsize=5) # Очередь на 5 элементов
def producer():
for i in range(10):
print(f"Producer: putting {i}")
queue.put(i) # Блокирует если очередь полна
time.sleep(0.5)
def consumer():
while True:
item = queue.get() # Ждёт если очередь пуста
if item is None: # Сигнал конца
break
print(f"Consumer: got {item}")
time.sleep(1)
p = Thread(target=producer)
c = Thread(target=consumer)
p.start()
c.start()
p.join()
queue.put(None) # Сигнализируем конец
c.join()
# Вывод:
# Producer: putting 0
# Consumer: got 0
# Producer: putting 1
# Producer: putting 2
# Producer: putting 3
# Producer: putting 4
# Producer: putting 5
# (Блокируется — очередь полна)
# Consumer: got 1
# Producer: putting 6
# ...
Реальный пример: обработка заказов
from threading import Thread
from queue import Queue
import time
import random
class OrderProcessor:
def __init__(self, num_workers=3):
self.order_queue = Queue() # Очередь заказов
self.results = [] # Результаты
self.workers = [
Thread(target=self.worker, daemon=True)
for _ in range(num_workers)
]
for w in self.workers:
w.start()
def worker(self):
"""Рабочий поток обрабатывает заказы из очереди"""
while True:
order = self.order_queue.get() # Ждёт заказа
if order is None: # Сигнал конца
break
# Обрабатываем заказ
print(f"Processing order {order['id']}")
time.sleep(random.uniform(0.5, 2)) # Имитируем работу
# Сохраняем результат (потокобезопасно благодаря GIL)
self.results.append(f"Processed {order['id']}")
self.order_queue.task_done() # Отмечаем как выполнено
def add_order(self, order):
self.order_queue.put(order)
def shutdown(self):
self.order_queue.join() # Ждём завершения всех
for _ in self.workers:
self.order_queue.put(None) # Стопируем работников
for w in self.workers:
w.join()
# Использование
processor = OrderProcessor(num_workers=3)
for i in range(10):
processor.add_order({"id": i, "data": f"Order {i}"})
processor.shutdown()
print("\nResults:")
for result in processor.results:
print(f" {result}")
GIL (Global Interpreter Lock) в Python
Важно: Python имеет GIL — глобальную блокировку интерпретатора. Это означает:
# GIL позволяет ТОЛЬКО ОДНОМУ потоку исполнять Python код одновременно
# (Даже на мультиядерном процессоре!)
# Поэтому многопоточность полезна для:
# - I/O операций (сеть, файлы, БД) — потоки ждут, не заняв GIL
# - GUI операций — нужна отзывчивость
# НО НЕ для:
# - CPU-bound задач (вычисления) — используй multiprocessing!
import threading
import time
def cpu_bound():
total = 0
for i in range(100_000_000):
total += i
return total
start = time.time()
cpu_bound()
cpu_bound()
print(f"Sequential: {time.time() - start:.2f}s") # 8 секунд
start = time.time()
t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=cpu_bound)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.time() - start:.2f}s") # 15 секунд (!!! медленнее)
# Используй multiprocessing вместо threading для CPU-bound:
from multiprocessing import Process
start = time.time()
p1 = Process(target=cpu_bound)
p2 = Process(target=cpu_bound)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.time() - start:.2f}s") # 4 секунды (параллелизм!)
Когда использовать объекты синхронизации
| Задача | Инструмент | Причина |
|---|---|---|
| Защита критической секции | Lock | Простейший случай |
| Рекурсивная блокировка | RLock | Один поток несколько раз |
| Ограничить одновременный доступ | Semaphore | Пул ресурсов (DB, HTTP) |
| Ждать события | Event | Синхронизация потоков |
| Ждать условия | Condition | Producer-consumer |
| I/O операции | Queue | Очередь задач |
| CPU-bound задачи | multiprocessing | Настоящий параллелизм |
Лучшие практики
# ✅ Используй context managers (with statement)
with lock:
# Критическая секция
pass
# Lock автоматически освобождается
# ❌ Вручную acquire/release опасно
lock.acquire()
try:
# Если тут исключение, lock не освободится!
pass
finally:
lock.release()
# ✅ Используй Queue для communication
queue.put(data)
data = queue.get()
# ❌ Не используй Lock для каждой переменной
lock1.acquire() # Сложно, ошибкам
lock2.acquire()
lock1.release()
Вывод
Объекты синхронизации нужны потому что:
- ✅ Предотвращают race conditions в многопоточных программах
- ✅ Позволяют потокам безопасно делить ресурсы
- ✅ Предоставляют примитивы для coordination между потоками
- ✅ Критичны для production-grade приложений
Главное правило: Используй Queue вместо ручных Lock'ов. Queue решает 90% задач безопаснее и проще.