← Назад к вопросам

Зачем нужен объект синхронизации в Python?

1.7 Middle🔥 41 комментариев
#REST API и HTTP#Базы данных (NoSQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Объекты синхронизации в Python

Отличный вопрос про многопоточность! Объекты синхронизации — это критический инструмент для работы с параллельными потоками. Давайте разберёмся, почему они нужны.

Проблема: Race Condition

Представь, что два потока одновременно изменяют один счётчик:

from threading import Thread

counter = 0

def increment():
    global counter
    for _ in range(1000000):
        counter += 1

# Запускаем два потока
thread1 = Thread(target=increment)
thread2 = Thread(target=increment)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Counter: {counter}")  # Ожидаем 2000000
# На самом деле: случайное число (например, 1234567)!

Почему? Операция counter += 1 на самом деле состоит из трёх шагов:

1. Прочитать counter из памяти (читай 10)
2. Увеличить на 1 (вычисли 11)
3. Записать обратно в память

Если оба потока делают это одновременно:
Тред 1: Читай (10) → Увеличь (11) → ...
Тред 2: Читай (10) → Увеличь (11) → Запиши (11)  # Ошибка!
Тред 1:                                 Запиши (11)  # Вместо 12!

Решение: Lock (Замок)

Lock гарантирует, что только один поток может исполнять код одновременно:

from threading import Thread, Lock

counter = 0
lock = Lock()  # ← Создаём замок

def increment():
    global counter
    for _ in range(1000000):
        with lock:  # ← Только один поток может войти сюда
            counter += 1

thread1 = Thread(target=increment)
thread2 = Thread(target=increment)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Counter: {counter}")  # 2000000 — правильно!

Основные объекты синхронизации

1. Lock (Mutex)

Самый простой механизм — бинарный замок (взял/отпустил):

from threading import Thread, Lock

resource = "Критический ресурс"
lock = Lock()

def use_resource():
    with lock:  # Экранированный доступ
        print(f"Accessing: {resource}")
        time.sleep(0.1)  # Имитируем работу
        print("Done")

threads = [Thread(target=use_resource) for _ in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Вывод:
# Accessing: ...
# Done
# Accessing: ...
# Done
# Accessing: ...
# Done
# (Все выполняются последовательно, не параллельно)

2. RLock (Reentrant Lock)

Обычный Lock нельзя захватить дважды из одного потока. RLock позволяет:

from threading import RLock

lock = RLock()

def function_a():
    with lock:
        print("In A")
        function_b()  # ← Может захватить тот же lock!

def function_b():
    with lock:
        print("In B")

function_a()  # Работает, обычный Lock вызвал бы deadlock

3. Semaphore

Позволяет N потокам одновременно доступить к ресурсу:

from threading import Thread, Semaphore
import time

# Максимум 3 потока одновременно (например, лимит соединений БД)
semaphore = Semaphore(3)

def access_database():
    with semaphore:  # Автоматически ждёт если уже 3 потока
        print(f"Worker {threading.current_thread().name} accessing DB")
        time.sleep(1)  # Имитируем работу
        print(f"Worker {threading.current_thread().name} done")

threads = [Thread(target=access_database, name=f"T{i}") for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Вывод:
# Worker T0 accessing DB
# Worker T1 accessing DB
# Worker T2 accessing DB
# (Ждут T3-T9)
# Worker T0 done
# Worker T3 accessing DB
# ...

4. Event

Позволяет одному потоку сигнализировать другим потокам о событии:

from threading import Thread, Event
import time

event = Event()  # Флаг "событие произошло"

def worker():
    print("Worker waiting for event...")
    event.wait()  # ← Ждёт пока не произойдёт событие
    print("Event happened, worker continues!")

def signaler():
    time.sleep(2)
    print("Signaling event...")
    event.set()  # ← Устанавливаем флаг

worker_thread = Thread(target=worker)
signaler_thread = Thread(target=signaler)

worker_thread.start()
signaler_thread.start()

worker_thread.join()
signaler_thread.join()

# Вывод:
# Worker waiting for event...
# (2 секунды)
# Signaling event...
# Event happened, worker continues!

5. Condition Variable

Комбинация Lock + Event. Позволяет потокам ждать условия:

from threading import Thread, Condition
import time

data_ready = False
condition = Condition()  # Lock + Event в одном

def producer():
    global data_ready
    with condition:
        print("Producer: preparing data...")
        time.sleep(1)
        data_ready = True
        print("Producer: data ready!")
        condition.notify_all()  # Сигнализируем потребителям

def consumer(consumer_id):
    with condition:
        while not data_ready:  # Ждём условия
            print(f"Consumer {consumer_id}: waiting...")
            condition.wait()  # ← Освобождает lock и ждёт сигнала
        print(f"Consumer {consumer_id}: got data!")

producer_thread = Thread(target=producer)
consumers = [Thread(target=consumer, args=(i,)) for i in range(3)]

for t in consumers:
    t.start()

producer_thread.start()

producer_thread.join()
for t in consumers:
    t.join()

# Вывод:
# Consumer 0: waiting...
# Consumer 1: waiting...
# Consumer 2: waiting...
# Producer: preparing data...
# Producer: data ready!
# Consumer 0: got data!
# Consumer 1: got data!
# Consumer 2: got data!

6. Queue (Очередь потокобезопасная)

Большинство случаев используй Queue вместо ручной синхронизации:

from threading import Thread
from queue import Queue
import time

queue = Queue(maxsize=5)  # Очередь на 5 элементов

def producer():
    for i in range(10):
        print(f"Producer: putting {i}")
        queue.put(i)  # Блокирует если очередь полна
        time.sleep(0.5)

def consumer():
    while True:
        item = queue.get()  # Ждёт если очередь пуста
        if item is None:  # Сигнал конца
            break
        print(f"Consumer: got {item}")
        time.sleep(1)

p = Thread(target=producer)
c = Thread(target=consumer)

p.start()
c.start()

p.join()
queue.put(None)  # Сигнализируем конец
c.join()

# Вывод:
# Producer: putting 0
# Consumer: got 0
# Producer: putting 1
# Producer: putting 2
# Producer: putting 3
# Producer: putting 4
# Producer: putting 5
# (Блокируется — очередь полна)
# Consumer: got 1
# Producer: putting 6
# ...

Реальный пример: обработка заказов

from threading import Thread
from queue import Queue
import time
import random

class OrderProcessor:
    def __init__(self, num_workers=3):
        self.order_queue = Queue()  # Очередь заказов
        self.results = []  # Результаты
        self.workers = [
            Thread(target=self.worker, daemon=True) 
            for _ in range(num_workers)
        ]
        for w in self.workers:
            w.start()
    
    def worker(self):
        """Рабочий поток обрабатывает заказы из очереди"""
        while True:
            order = self.order_queue.get()  # Ждёт заказа
            if order is None:  # Сигнал конца
                break
            
            # Обрабатываем заказ
            print(f"Processing order {order['id']}")
            time.sleep(random.uniform(0.5, 2))  # Имитируем работу
            
            # Сохраняем результат (потокобезопасно благодаря GIL)
            self.results.append(f"Processed {order['id']}")
            
            self.order_queue.task_done()  # Отмечаем как выполнено
    
    def add_order(self, order):
        self.order_queue.put(order)
    
    def shutdown(self):
        self.order_queue.join()  # Ждём завершения всех
        for _ in self.workers:
            self.order_queue.put(None)  # Стопируем работников
        for w in self.workers:
            w.join()

# Использование
processor = OrderProcessor(num_workers=3)

for i in range(10):
    processor.add_order({"id": i, "data": f"Order {i}"})

processor.shutdown()

print("\nResults:")
for result in processor.results:
    print(f"  {result}")

GIL (Global Interpreter Lock) в Python

Важно: Python имеет GIL — глобальную блокировку интерпретатора. Это означает:

# GIL позволяет ТОЛЬКО ОДНОМУ потоку исполнять Python код одновременно
# (Даже на мультиядерном процессоре!)

# Поэтому многопоточность полезна для:
# - I/O операций (сеть, файлы, БД) — потоки ждут, не заняв GIL
# - GUI операций — нужна отзывчивость

# НО НЕ для:
# - CPU-bound задач (вычисления) — используй multiprocessing!

import threading
import time

def cpu_bound():
    total = 0
    for i in range(100_000_000):
        total += i
    return total

start = time.time()
cpu_bound()
cpu_bound()
print(f"Sequential: {time.time() - start:.2f}s")  # 8 секунд

start = time.time()
t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=cpu_bound)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.time() - start:.2f}s")  # 15 секунд (!!! медленнее)

# Используй multiprocessing вместо threading для CPU-bound:
from multiprocessing import Process

start = time.time()
p1 = Process(target=cpu_bound)
p2 = Process(target=cpu_bound)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")  # 4 секунды (параллелизм!)

Когда использовать объекты синхронизации

ЗадачаИнструментПричина
Защита критической секцииLockПростейший случай
Рекурсивная блокировкаRLockОдин поток несколько раз
Ограничить одновременный доступSemaphoreПул ресурсов (DB, HTTP)
Ждать событияEventСинхронизация потоков
Ждать условияConditionProducer-consumer
I/O операцииQueueОчередь задач
CPU-bound задачиmultiprocessingНастоящий параллелизм

Лучшие практики

# ✅ Используй context managers (with statement)
with lock:
    # Критическая секция
    pass
# Lock автоматически освобождается

# ❌ Вручную acquire/release опасно
lock.acquire()
try:
    # Если тут исключение, lock не освободится!
    pass
finally:
    lock.release()

# ✅ Используй Queue для communication
queue.put(data)
data = queue.get()

# ❌ Не используй Lock для каждой переменной
lock1.acquire()  # Сложно, ошибкам
lock2.acquire()
lock1.release()

Вывод

Объекты синхронизации нужны потому что:

  • ✅ Предотвращают race conditions в многопоточных программах
  • ✅ Позволяют потокам безопасно делить ресурсы
  • ✅ Предоставляют примитивы для coordination между потоками
  • ✅ Критичны для production-grade приложений

Главное правило: Используй Queue вместо ручных Lock'ов. Queue решает 90% задач безопаснее и проще.