← Назад к вопросам

Как синхронизировать потоки в Python?

1.7 Middle🔥 171 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Синхронизация потоков в Python

Синхронизация потоков - это механизм для координации работы нескольких потоков, чтобы они не конфликтовали при доступе к общим ресурсам. Python предоставляет несколько инструментов для этого.

Lock (Мьютекс)

Базовый примитив синхронизации для защиты критической секции:

import threading

lock = threading.Lock()
counter = 0

def increment():
    global counter
    with lock:
        # Только один поток может быть здесь одновременно
        temp = counter
        temp += 1
        counter = temp

threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 100 (правильно)

Без синхронизации:

# Без lock результат будет < 100 из-за race condition
def bad_increment():
    global counter
    temp = counter
    temp += 1
    counter = temp

RLock (Recursive Lock)

Мьютекс, который один поток может захватить несколько раз:

lock = threading.RLock()

def recursive_function(n):
    with lock:
        if n == 0:
            return 1
        return n * recursive_function(n - 1)

# Без RLock было бы deadlock
result = recursive_function(5)

Semaphore

Счетчик для ограничения количества потоков, имеющих доступ к ресурсу:

import threading
import time

semaphore = threading.Semaphore(2)  # Максимум 2 потока одновременно

def limited_resource():
    with semaphore:
        print(f"Поток {threading.current_thread().name} работает")
        time.sleep(1)
        print(f"Поток {threading.current_thread().name} закончил")

threads = [threading.Thread(target=limited_resource, name=f"T{i}") for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Практический пример (ограничение подключений):

class ConnectionPool:
    def __init__(self, max_connections=5):
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = []
    
    def acquire_connection(self):
        self.semaphore.acquire()
        return self.connections.pop()
    
    def release_connection(self, conn):
        self.connections.append(conn)
        self.semaphore.release()

Event

Сигнал для уведомления потоков о наступлении события:

import threading
import time

event = threading.Event()

def worker():
    print("Рабочий ждет события...")
    event.wait()  # Блокирует до set()
    print("Событие произошло!")

def signaler():
    time.sleep(2)
    print("Подаю сигнал...")
    event.set()

t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=signaler)

t1.start()
t2.start()
t1.join()
t2.join()

С timeout:

if not event.wait(timeout=5):
    print("Событие не произошло за 5 секунд")
else:
    print("Событие произошло")

Condition (Условная переменная)

Комбинация lock и event для более сложной синхронизации:

import threading

condition = threading.Condition()
queue = []

def producer():
    for i in range(5):
        with condition:
            queue.append(i)
            print(f"Produced {i}")
            condition.notify()  # Разбудить ждущий поток

def consumer():
    while True:
        with condition:
            while len(queue) == 0:
                condition.wait()  # Жди уведомления
            item = queue.pop(0)
            print(f"Consumed {item}")
            if item == 4:
                break

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

Queue (Потокобезопасная очередь)

Рекомендуемый способ обмена данными между потоками:

from queue import Queue
import threading

queue = Queue(maxsize=5)

def producer():
    for i in range(10):
        queue.put(i)
        print(f"Produced {i}")

def consumer():
    while True:
        item = queue.get()
        print(f"Consumed {item}")
        if item == 9:
            break
        queue.task_done()

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

PriorityQueue (приоритет):

from queue import PriorityQueue

pq = PriorityQueue()
pq.put((1, "высокий приоритет"))
pq.put((10, "низкий приоритет"))
pq.put((5, "средний приоритет"))

while not pq.empty():
    priority, item = pq.get()
    print(f"Priority {priority}: {item}")

Barrier

Блокирует потоки до достижения определенного количества:

import threading
import time

barrier = threading.Barrier(3)

def worker(n):
    print(f"Worker {n} ready")
    barrier.wait()  # Ждет остальных
    print(f"Worker {n} started")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

GIL (Global Interpreter Lock)

В CPython есть глобальный мьютекс (GIL):

import threading
import time

def cpu_bound():
    total = 0
    for i in range(50000000):
        total += i

# GIL не пускает параллельное выполнение CPU-bound задач
start = time.time()
t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=cpu_bound)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.time() - start:.2f}s")

# Используйте multiprocessing для CPU-bound
from multiprocessing import Process

start = time.time()
p1 = Process(target=cpu_bound)
p2 = Process(target=cpu_bound)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")

Thread-Local Storage

Данные, уникальные для каждого потока:

import threading

local_data = threading.local()

def worker(n):
    local_data.value = n * 10
    print(f"Thread {n}: {local_data.value}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Практический пример: пул потоков

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    response = requests.get(url)
    return len(response.content)

urls = ["http://example.com" for _ in range(10)]

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(fetch_url, urls)
    for size in results:
        print(f"Downloaded {size} bytes")

Лучшие практики

  • Используйте Queue для обмена данными между потоками
  • Избегайте прямого общего состояния (shared state)
  • Используйте контекстные менеджеры (with) для lock
  • Для IO-bound используйте threading
  • Для CPU-bound используйте multiprocessing или asyncio
  • Всегда вызывайте join() на потоках
  • Остерегайтесь deadlock - избегайте вложенных lock
  • Используйте concurrent.futures для удобства

Чек-лист синхронизации

  • Lock защищает критические секции
  • Queue безопасна для многопоточности
  • Event используется для сигналов
  • Condition для сложной синхронизации
  • Barrier для синхронизации множества потоков
  • Избегайте nested locks (deadlock)
  • Помните про GIL для CPU-bound задач
Как синхронизировать потоки в Python? | PrepBro