Какие можешь использовать механизмы синхронизации потоков?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Механизмы синхронизации потоков в Python
Синхронизация потоков критически важна при работе с многопоточностью, чтобы предотвратить race conditions и обеспечить корректный доступ к общим ресурсам.
1. Lock (Mutex)
Базовый примитив синхронизации. Обеспечивает взаимоисключение:
import threading
shared_counter = 0
lock = threading.Lock()
def increment():
global shared_counter
with lock:
temp = shared_counter
temp += 1
shared_counter = temp
# Или через acquire/release
def increment_manual():
global shared_counter
lock.acquire()
try:
shared_counter += 1
finally:
lock.release()
# Проверка доступности
if lock.acquire(blocking=False):
try:
# Критическая секция
pass
finally:
lock.release()
else:
print('Lock is held by another thread')
2. RLock (Reentrant Lock)
Рекурсивная блокировка, позволяет одному потоку захватить её несколько раз:
rlock = threading.RLock()
def outer():
with rlock:
print('Outer')
inner()
def inner():
with rlock: # Может захватить вторично!
print('Inner')
# Обычный Lock вызвал бы deadlock
# RLock позволяет переиспользовать
outer()
3. Semaphore
Ограничивает количество потоков, обращающихся к ресурсу:
# Максимум 3 потока одновременно
semaphore = threading.Semaphore(3)
def access_resource():
with semaphore:
print('Accessing resource')
# Только 3 потока здесь одновременно
import time
time.sleep(1)
# Запустить 10 потоков
threads = []
for _ in range(10):
t = threading.Thread(target=access_resource)
threads.append(t)
t.start()
for t in threads:
t.join()
4. BoundedSemaphore
Ограниченная семафор, предотвращающая переполнение:
bounded_sem = threading.BoundedSemaphore(2)
bounded_sem.acquire()
bounded_sem.acquire()
# Это вызовет ошибку (освобождаем больше, чем занимали)
# bounded_sem.release() # OK
# bounded_sem.release() # OK
# bounded_sem.release() # ValueError!
print('Semaphore works correctly')
5. Event
Для сигнализирования между потоками:
event = threading.Event()
def waiter():
print('Waiter: waiting for event...')
event.wait() # Блокируется до set()
print('Waiter: event received!')
def setter():
import time
time.sleep(2)
print('Setter: setting event')
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
# Сбросить event
event.clear()
print('Event is cleared')
# Проверить состояние
if event.is_set():
print('Event is set')
6. Condition (Условная переменная)
Для более сложной синхронизации:
condition = threading.Condition()
data = None
def producer():
global data
import time
for i in range(3):
time.sleep(1)
with condition:
data = f'Data {i}'
print(f'Producer: produced {data}')
condition.notify_all() # Сигнализировать ожидающим потокам
def consumer(id):
global data
for _ in range(3):
with condition:
condition.wait() # Ждать сигнала
print(f'Consumer {id}: consumed {data}')
p = threading.Thread(target=producer)
c1 = threading.Thread(target=consumer, args=(1,))
c2 = threading.Thread(target=consumer, args=(2,))
p.start()
c1.start()
c2.start()
p.join()
c1.join()
c2.join()
7. Queue
Потокобезопасная очередь для обмена данными:
import queue
# Неограниченная очередь
q = queue.Queue()
# Ограниченная очередь (max 5 элементов)
bounded_q = queue.Queue(maxsize=5)
def producer():
for i in range(5):
print(f'Producing {i}')
q.put(i)
import time
time.sleep(0.5)
q.put(None) # Сигнал конца
def consumer():
while True:
item = q.get() # Блокируется, если очередь пуста
if item is None:
break
print(f'Consuming {item}')
q.task_done() # Сигнализировать об обработке
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
c.join()
print('All items processed')
8. Priority Queue
Очередь с приоритизацией:
import queue
import threading
pq = queue.PriorityQueue()
def worker():
while True:
priority, item = pq.get()
if item is None:
break
print(f'Processing (priority {priority}): {item}')
pq.task_done()
# Отправить элементы с разными приоритетами
pq.put((3, 'low'))
pq.put((1, 'high'))
pq.put((2, 'medium'))
# Запустить worker
t = threading.Thread(target=worker, daemon=True)
t.start()
# Подождать обработки
pq.join()
pq.put((0, None)) # Сигнал конца
t.join()
9. Lock с таймаутом
lock = threading.Lock()
# Попробовать захватить с таймаутом (Python 3.2+)
if lock.acquire(timeout=2):
try:
print('Lock acquired')
finally:
lock.release()
else:
print('Could not acquire lock within 2 seconds')
10. Threading модуль Barrier
Для синхронизации нескольких потоков в точке:
barrier = threading.Barrier(3) # Ждать 3 потока
def worker(id):
print(f'Worker {id}: reaching barrier')
barrier.wait() # Блокируется до пока все 3 не достигнут
print(f'Worker {id}: passed barrier')
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
11. asyncio для асинхронности
Для асинхронного кода (лучше, чем потоки для I/O):
import asyncio
lock = asyncio.Lock()
data = None
async def producer():
global data
for i in range(3):
await asyncio.sleep(1)
async with lock:
data = f'Data {i}'
print(f'Produced: {data}')
async def consumer(id):
global data
for _ in range(3):
await asyncio.sleep(1.5)
async with lock:
print(f'Consumer {id}: {data}')
async def main():
await asyncio.gather(
producer(),
consumer(1),
consumer(2)
)
asyncio.run(main())
12. asyncio.Event
async def async_event_example():
event = asyncio.Event()
async def waiter():
await event.wait()
print('Event received')
async def setter():
await asyncio.sleep(1)
event.set()
await asyncio.gather(waiter(), setter())
asyncio.run(async_event_example())
Рекомендации по выбору:
- Lock/RLock: Простая защита критических секций
- Semaphore: Ограничение доступа к ресурсам
- Event: Простая сигнализация между потоками
- Condition: Сложные сценарии с ожиданием условий
- Queue: Обмен данными между потоками
- asyncio: Предпочитать для I/O операций вместо потоков
- Barrier: Синхронизация нескольких потоков в точке
При разработке обычно предпочитают asyncio потокам, так как это более эффективно для I/O операций и проще избежать deadlock'ов.