Какие знаешь механизмы обмена данными между параллельными сущностями?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Механизмы обмена данными между параллельными сущностями
Параллельные сущности (потоки, процессы, корутины) нуждаются в безопасном обмене данными. Рассмотрю все основные подходы.
1. Очереди (Queues)
Queue для потоков (thread-safe)
from queue import Queue
import threading
q = Queue(maxsize=10)
def producer():
for i in range(5):
q.put(f'Item {i}')
print(f'Produced: Item {i}')
def consumer():
while True:
item = q.get()
if item is None:
break
print(f'Consumed: {item}')
q.task_done()
p_thread = threading.Thread(target=producer)
c_thread = threading.Thread(target=consumer)
p_thread.start()
c_thread.start()
p_thread.join()
q.put(None)
c_thread.join()
PriorityQueue
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((3, 'low priority'))
pq.put((1, 'high priority'))
pq.put((2, 'medium priority'))
while not pq.empty():
priority, item = pq.get()
print(f'{priority}: {item}') # Выводит в порядке приоритета
2. Асинхронные очереди (asyncio)
asyncio.Queue для корутин
import asyncio
class Producer:
def __init__(self, queue):
self.queue = queue
async def produce(self):
for i in range(5):
await self.queue.put(f'Item {i}')
print(f'Produced: Item {i}')
await asyncio.sleep(0.5)
class Consumer:
def __init__(self, queue):
self.queue = queue
async def consume(self):
while True:
item = await self.queue.get()
if item is None:
break
print(f'Consumed: {item}')
self.queue.task_done()
async def main():
queue = asyncio.Queue()
producer = Producer(queue)
consumer = Consumer(queue)
await asyncio.gather(
producer.produce(),
consumer.consume()
)
await queue.put(None)
asyncio.run(main())
3. Multiprocessing Queue
Для межпроцессного обмена
from multiprocessing import Process, Queue
import time
def producer_process(q):
for i in range(5):
q.put(f'Data {i}')
time.sleep(1)
def consumer_process(q):
while True:
try:
data = q.get(timeout=2)
print(f'Received: {data}')
except:
break
if __name__ == '__main__':
queue = Queue()
p1 = Process(target=producer_process, args=(queue,))
p2 = Process(target=consumer_process, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
4. Pipe (Канал)
Двусторонняя коммуникация между процессами
from multiprocessing import Process, Pipe
def sender(conn):
for i in range(5):
conn.send(f'Message {i}')
conn.close()
def receiver(conn):
while True:
try:
msg = conn.recv()
print(f'Received: {msg}')
except EOFError:
break
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
5. Locks (Блокировки)
Для синхронизации доступа к общему ресурсу
import threading
shared_counter = 0
lock = threading.Lock()
def increment():
global shared_counter
for _ in range(100000):
with lock:
shared_counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f'Final counter: {shared_counter}') # Всегда 1000000
RLock (переиспользуемая блокировка)
import threading
rlock = threading.RLock()
def recursive_function(depth):
with rlock:
if depth > 0:
recursive_function(depth - 1)
else:
print('Done')
recursive_function(3)
6. Event (События)
Сигнализирование между потоками
import threading
import time
event = threading.Event()
def waiter():
print('Waiter: Waiting for event...')
event.wait()
print('Waiter: Event was set!')
def setter():
time.sleep(2)
print('Setter: Setting event...')
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
7. Condition (Условия)
Более сложная синхронизация с notification
import threading
import time
condition = threading.Condition()
resource = None
def producer():
global resource
for i in range(3):
with condition:
resource = f'Item {i}'
print(f'Produced: {resource}')
condition.notify_all()
time.sleep(1)
def consumer(consumer_id):
global resource
for _ in range(3):
with condition:
while resource is None:
condition.wait()
print(f'Consumer {consumer_id} got: {resource}')
resource = None
p = threading.Thread(target=producer)
c1 = threading.Thread(target=consumer, args=(1,))
c2 = threading.Thread(target=consumer, args=(2,))
p.start()
c1.start()
c2.start()
p.join()
c1.join()
c2.join()
8. Semaphore (Семафор)
Ограничение количества одновременно работающих потоков
import threading
import time
semaphore = threading.Semaphore(3) # Max 3 одновременно
def worker(worker_id):
with semaphore:
print(f'Worker {worker_id} started')
time.sleep(2)
print(f'Worker {worker_id} finished')
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
9. Barrier (Барьер)
Синхронизация нескольких потоков в точке
import threading
barrier = threading.Barrier(3) # Ждёт 3 потока
def worker(worker_id):
print(f'Worker {worker_id} waiting at barrier')
barrier.wait()
print(f'Worker {worker_id} passed barrier')
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
10. Shared Memory (для процессов)
Разделение памяти между процессами
from multiprocessing import Process, Value, Array
import time
def writer(shared_int, shared_array):
for i in range(5):
shared_int.value = i
for j in range(3):
shared_array[j] = i * j
time.sleep(0.5)
def reader(shared_int, shared_array):
for _ in range(5):
print(f'Int: {shared_int.value}, Array: {list(shared_array)}')
time.sleep(0.5)
if __name__ == '__main__':
shared_int = Value('i', 0)
shared_array = Array('i', 3)
p1 = Process(target=writer, args=(shared_int, shared_array))
p2 = Process(target=reader, args=(shared_int, shared_array))
p1.start()
p2.start()
p1.join()
p2.join()
11. Message Bus (шина сообщений)
Паттерн pub/sub
class MessageBus:
def __init__(self):
self.subscribers = {}
def subscribe(self, topic, callback):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
def publish(self, topic, message):
if topic in self.subscribers:
for callback in self.subscribers[topic]:
callback(message)
bus = MessageBus()
def on_user_created(user):
print(f'User created: {user}')
def send_welcome_email(user):
print(f'Sending email to {user}')
bus.subscribe('user:created', on_user_created)
bus.subscribe('user:created', send_welcome_email)
bus.publish('user:created', 'john@example.com')
12. Сравнительная таблица
| Механизм | Потоки | Процессы | Корутины | Простота | Скорость |
|---|---|---|---|---|---|
| Queue | ✓ | ✓ | ✓ | High | Medium |
| Pipe | ✗ | ✓ | ✗ | High | High |
| Lock | ✓ | ✗ | ✓ | Medium | High |
| Event | ✓ | ✗ | ✓ | High | High |
| Condition | ✓ | ✗ | ✓ | Medium | High |
| Semaphore | ✓ | ✗ | ✓ | High | High |
| Shared Memory | ✗ | ✓ | ✗ | Low | High |
| Message Bus | ✓ | ✓ | ✓ | Medium | Medium |
13. Выбор механизма
Для потоков: Queue (простой обмен), Lock (синхронизация) Для процессов: Queue, Pipe, Shared Memory Для корутин: asyncio.Queue, Event, Condition Для микросервисов: Message Bus (Redis, RabbitMQ)
Каждый механизм решает конкретную задачу — выбирай в зависимости от архитектуры!