← Назад к вопросам

Какие можешь использовать механизмы синхронизации потоков?

1.7 Middle🔥 111 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Механизмы синхронизации потоков в Python

Синхронизация потоков критически важна при работе с многопоточностью, чтобы предотвратить race conditions и обеспечить корректный доступ к общим ресурсам.

1. Lock (Mutex)

Базовый примитив синхронизации. Обеспечивает взаимоисключение:

import threading

shared_counter = 0
lock = threading.Lock()

def increment():
    global shared_counter
    with lock:
        temp = shared_counter
        temp += 1
        shared_counter = temp

# Или через acquire/release
def increment_manual():
    global shared_counter
    lock.acquire()
    try:
        shared_counter += 1
    finally:
        lock.release()

# Проверка доступности
if lock.acquire(blocking=False):
    try:
        # Критическая секция
        pass
    finally:
        lock.release()
else:
    print('Lock is held by another thread')

2. RLock (Reentrant Lock)

Рекурсивная блокировка, позволяет одному потоку захватить её несколько раз:

rlock = threading.RLock()

def outer():
    with rlock:
        print('Outer')
        inner()

def inner():
    with rlock:  # Может захватить вторично!
        print('Inner')

# Обычный Lock вызвал бы deadlock
# RLock позволяет переиспользовать
outer()

3. Semaphore

Ограничивает количество потоков, обращающихся к ресурсу:

# Максимум 3 потока одновременно
semaphore = threading.Semaphore(3)

def access_resource():
    with semaphore:
        print('Accessing resource')
        # Только 3 потока здесь одновременно
        import time
        time.sleep(1)

# Запустить 10 потоков
threads = []
for _ in range(10):
    t = threading.Thread(target=access_resource)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

4. BoundedSemaphore

Ограниченная семафор, предотвращающая переполнение:

bounded_sem = threading.BoundedSemaphore(2)

bounded_sem.acquire()
bounded_sem.acquire()

# Это вызовет ошибку (освобождаем больше, чем занимали)
# bounded_sem.release()  # OK
# bounded_sem.release()  # OK
# bounded_sem.release()  # ValueError!

print('Semaphore works correctly')

5. Event

Для сигнализирования между потоками:

event = threading.Event()

def waiter():
    print('Waiter: waiting for event...')
    event.wait()  # Блокируется до set()
    print('Waiter: event received!')

def setter():
    import time
    time.sleep(2)
    print('Setter: setting event')
    event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)

t1.start()
t2.start()

t1.join()
t2.join()

# Сбросить event
event.clear()
print('Event is cleared')

# Проверить состояние
if event.is_set():
    print('Event is set')

6. Condition (Условная переменная)

Для более сложной синхронизации:

condition = threading.Condition()
data = None

def producer():
    global data
    import time
    
    for i in range(3):
        time.sleep(1)
        with condition:
            data = f'Data {i}'
            print(f'Producer: produced {data}')
            condition.notify_all()  # Сигнализировать ожидающим потокам

def consumer(id):
    global data
    
    for _ in range(3):
        with condition:
            condition.wait()  # Ждать сигнала
            print(f'Consumer {id}: consumed {data}')

p = threading.Thread(target=producer)
c1 = threading.Thread(target=consumer, args=(1,))
c2 = threading.Thread(target=consumer, args=(2,))

p.start()
c1.start()
c2.start()

p.join()
c1.join()
c2.join()

7. Queue

Потокобезопасная очередь для обмена данными:

import queue

# Неограниченная очередь
q = queue.Queue()

# Ограниченная очередь (max 5 элементов)
bounded_q = queue.Queue(maxsize=5)

def producer():
    for i in range(5):
        print(f'Producing {i}')
        q.put(i)
        import time
        time.sleep(0.5)
    
    q.put(None)  # Сигнал конца

def consumer():
    while True:
        item = q.get()  # Блокируется, если очередь пуста
        
        if item is None:
            break
        
        print(f'Consuming {item}')
        q.task_done()  # Сигнализировать об обработке

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)

p.start()
c.start()

p.join()
c.join()

print('All items processed')

8. Priority Queue

Очередь с приоритизацией:

import queue
import threading

pq = queue.PriorityQueue()

def worker():
    while True:
        priority, item = pq.get()
        if item is None:
            break
        print(f'Processing (priority {priority}): {item}')
        pq.task_done()

# Отправить элементы с разными приоритетами
pq.put((3, 'low'))
pq.put((1, 'high'))
pq.put((2, 'medium'))

# Запустить worker
t = threading.Thread(target=worker, daemon=True)
t.start()

# Подождать обработки
pq.join()
pq.put((0, None))  # Сигнал конца
t.join()

9. Lock с таймаутом

lock = threading.Lock()

# Попробовать захватить с таймаутом (Python 3.2+)
if lock.acquire(timeout=2):
    try:
        print('Lock acquired')
    finally:
        lock.release()
else:
    print('Could not acquire lock within 2 seconds')

10. Threading модуль Barrier

Для синхронизации нескольких потоков в точке:

barrier = threading.Barrier(3)  # Ждать 3 потока

def worker(id):
    print(f'Worker {id}: reaching barrier')
    barrier.wait()  # Блокируется до пока все 3 не достигнут
    print(f'Worker {id}: passed barrier')

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

11. asyncio для асинхронности

Для асинхронного кода (лучше, чем потоки для I/O):

import asyncio

lock = asyncio.Lock()
data = None

async def producer():
    global data
    for i in range(3):
        await asyncio.sleep(1)
        async with lock:
            data = f'Data {i}'
            print(f'Produced: {data}')

async def consumer(id):
    global data
    for _ in range(3):
        await asyncio.sleep(1.5)
        async with lock:
            print(f'Consumer {id}: {data}')

async def main():
    await asyncio.gather(
        producer(),
        consumer(1),
        consumer(2)
    )

asyncio.run(main())

12. asyncio.Event

async def async_event_example():
    event = asyncio.Event()
    
    async def waiter():
        await event.wait()
        print('Event received')
    
    async def setter():
        await asyncio.sleep(1)
        event.set()
    
    await asyncio.gather(waiter(), setter())

asyncio.run(async_event_example())

Рекомендации по выбору:

  • Lock/RLock: Простая защита критических секций
  • Semaphore: Ограничение доступа к ресурсам
  • Event: Простая сигнализация между потоками
  • Condition: Сложные сценарии с ожиданием условий
  • Queue: Обмен данными между потоками
  • asyncio: Предпочитать для I/O операций вместо потоков
  • Barrier: Синхронизация нескольких потоков в точке

При разработке обычно предпочитают asyncio потокам, так как это более эффективно для I/O операций и проще избежать deadlock'ов.

Какие можешь использовать механизмы синхронизации потоков? | PrepBro