Как синхронизировать потоки в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Синхронизация потоков в Python
Синхронизация потоков - это механизм для координации работы нескольких потоков, чтобы они не конфликтовали при доступе к общим ресурсам. Python предоставляет несколько инструментов для этого.
Lock (Мьютекс)
Базовый примитив синхронизации для защиты критической секции:
import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock:
# Только один поток может быть здесь одновременно
temp = counter
temp += 1
counter = temp
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 100 (правильно)
Без синхронизации:
# Без lock результат будет < 100 из-за race condition
def bad_increment():
global counter
temp = counter
temp += 1
counter = temp
RLock (Recursive Lock)
Мьютекс, который один поток может захватить несколько раз:
lock = threading.RLock()
def recursive_function(n):
with lock:
if n == 0:
return 1
return n * recursive_function(n - 1)
# Без RLock было бы deadlock
result = recursive_function(5)
Semaphore
Счетчик для ограничения количества потоков, имеющих доступ к ресурсу:
import threading
import time
semaphore = threading.Semaphore(2) # Максимум 2 потока одновременно
def limited_resource():
with semaphore:
print(f"Поток {threading.current_thread().name} работает")
time.sleep(1)
print(f"Поток {threading.current_thread().name} закончил")
threads = [threading.Thread(target=limited_resource, name=f"T{i}") for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
Практический пример (ограничение подключений):
class ConnectionPool:
def __init__(self, max_connections=5):
self.semaphore = threading.Semaphore(max_connections)
self.connections = []
def acquire_connection(self):
self.semaphore.acquire()
return self.connections.pop()
def release_connection(self, conn):
self.connections.append(conn)
self.semaphore.release()
Event
Сигнал для уведомления потоков о наступлении события:
import threading
import time
event = threading.Event()
def worker():
print("Рабочий ждет события...")
event.wait() # Блокирует до set()
print("Событие произошло!")
def signaler():
time.sleep(2)
print("Подаю сигнал...")
event.set()
t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=signaler)
t1.start()
t2.start()
t1.join()
t2.join()
С timeout:
if not event.wait(timeout=5):
print("Событие не произошло за 5 секунд")
else:
print("Событие произошло")
Condition (Условная переменная)
Комбинация lock и event для более сложной синхронизации:
import threading
condition = threading.Condition()
queue = []
def producer():
for i in range(5):
with condition:
queue.append(i)
print(f"Produced {i}")
condition.notify() # Разбудить ждущий поток
def consumer():
while True:
with condition:
while len(queue) == 0:
condition.wait() # Жди уведомления
item = queue.pop(0)
print(f"Consumed {item}")
if item == 4:
break
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Queue (Потокобезопасная очередь)
Рекомендуемый способ обмена данными между потоками:
from queue import Queue
import threading
queue = Queue(maxsize=5)
def producer():
for i in range(10):
queue.put(i)
print(f"Produced {i}")
def consumer():
while True:
item = queue.get()
print(f"Consumed {item}")
if item == 9:
break
queue.task_done()
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
PriorityQueue (приоритет):
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((1, "высокий приоритет"))
pq.put((10, "низкий приоритет"))
pq.put((5, "средний приоритет"))
while not pq.empty():
priority, item = pq.get()
print(f"Priority {priority}: {item}")
Barrier
Блокирует потоки до достижения определенного количества:
import threading
import time
barrier = threading.Barrier(3)
def worker(n):
print(f"Worker {n} ready")
barrier.wait() # Ждет остальных
print(f"Worker {n} started")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
GIL (Global Interpreter Lock)
В CPython есть глобальный мьютекс (GIL):
import threading
import time
def cpu_bound():
total = 0
for i in range(50000000):
total += i
# GIL не пускает параллельное выполнение CPU-bound задач
start = time.time()
t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=cpu_bound)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.time() - start:.2f}s")
# Используйте multiprocessing для CPU-bound
from multiprocessing import Process
start = time.time()
p1 = Process(target=cpu_bound)
p2 = Process(target=cpu_bound)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")
Thread-Local Storage
Данные, уникальные для каждого потока:
import threading
local_data = threading.local()
def worker(n):
local_data.value = n * 10
print(f"Thread {n}: {local_data.value}")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
Практический пример: пул потоков
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return len(response.content)
urls = ["http://example.com" for _ in range(10)]
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch_url, urls)
for size in results:
print(f"Downloaded {size} bytes")
Лучшие практики
- Используйте Queue для обмена данными между потоками
- Избегайте прямого общего состояния (shared state)
- Используйте контекстные менеджеры (with) для lock
- Для IO-bound используйте threading
- Для CPU-bound используйте multiprocessing или asyncio
- Всегда вызывайте join() на потоках
- Остерегайтесь deadlock - избегайте вложенных lock
- Используйте concurrent.futures для удобства
Чек-лист синхронизации
- Lock защищает критические секции
- Queue безопасна для многопоточности
- Event используется для сигналов
- Condition для сложной синхронизации
- Barrier для синхронизации множества потоков
- Избегайте nested locks (deadlock)
- Помните про GIL для CPU-bound задач