← Назад к вопросам

Какие знаешь механизмы обмена данными между параллельными сущностями?

2.0 Middle🔥 191 комментариев
#Python Core#Архитектура и паттерны#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Механизмы обмена данными между параллельными сущностями

Параллельные сущности (потоки, процессы, корутины) нуждаются в безопасном обмене данными. Рассмотрю все основные подходы.

1. Очереди (Queues)

Queue для потоков (thread-safe)

from queue import Queue
import threading

q = Queue(maxsize=10)

def producer():
    for i in range(5):
        q.put(f'Item {i}')
        print(f'Produced: Item {i}')

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f'Consumed: {item}')
        q.task_done()

p_thread = threading.Thread(target=producer)
c_thread = threading.Thread(target=consumer)

p_thread.start()
c_thread.start()

p_thread.join()
q.put(None)
c_thread.join()

PriorityQueue

from queue import PriorityQueue

pq = PriorityQueue()

pq.put((3, 'low priority'))
pq.put((1, 'high priority'))
pq.put((2, 'medium priority'))

while not pq.empty():
    priority, item = pq.get()
    print(f'{priority}: {item}')  # Выводит в порядке приоритета

2. Асинхронные очереди (asyncio)

asyncio.Queue для корутин

import asyncio

class Producer:
    def __init__(self, queue):
        self.queue = queue
    
    async def produce(self):
        for i in range(5):
            await self.queue.put(f'Item {i}')
            print(f'Produced: Item {i}')
            await asyncio.sleep(0.5)

class Consumer:
    def __init__(self, queue):
        self.queue = queue
    
    async def consume(self):
        while True:
            item = await self.queue.get()
            if item is None:
                break
            print(f'Consumed: {item}')
            self.queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer = Producer(queue)
    consumer = Consumer(queue)
    
    await asyncio.gather(
        producer.produce(),
        consumer.consume()
    )
    await queue.put(None)

asyncio.run(main())

3. Multiprocessing Queue

Для межпроцессного обмена

from multiprocessing import Process, Queue
import time

def producer_process(q):
    for i in range(5):
        q.put(f'Data {i}')
        time.sleep(1)

def consumer_process(q):
    while True:
        try:
            data = q.get(timeout=2)
            print(f'Received: {data}')
        except:
            break

if __name__ == '__main__':
    queue = Queue()
    
    p1 = Process(target=producer_process, args=(queue,))
    p2 = Process(target=consumer_process, args=(queue,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

4. Pipe (Канал)

Двусторонняя коммуникация между процессами

from multiprocessing import Process, Pipe

def sender(conn):
    for i in range(5):
        conn.send(f'Message {i}')
    conn.close()

def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            print(f'Received: {msg}')
        except EOFError:
            break
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

5. Locks (Блокировки)

Для синхронизации доступа к общему ресурсу

import threading

shared_counter = 0
lock = threading.Lock()

def increment():
    global shared_counter
    for _ in range(100000):
        with lock:
            shared_counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f'Final counter: {shared_counter}')  # Всегда 1000000

RLock (переиспользуемая блокировка)

import threading

rlock = threading.RLock()

def recursive_function(depth):
    with rlock:
        if depth > 0:
            recursive_function(depth - 1)
        else:
            print('Done')

recursive_function(3)

6. Event (События)

Сигнализирование между потоками

import threading
import time

event = threading.Event()

def waiter():
    print('Waiter: Waiting for event...')
    event.wait()
    print('Waiter: Event was set!')

def setter():
    time.sleep(2)
    print('Setter: Setting event...')
    event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)

t1.start()
t2.start()

t1.join()
t2.join()

7. Condition (Условия)

Более сложная синхронизация с notification

import threading
import time

condition = threading.Condition()
resource = None

def producer():
    global resource
    for i in range(3):
        with condition:
            resource = f'Item {i}'
            print(f'Produced: {resource}')
            condition.notify_all()
        time.sleep(1)

def consumer(consumer_id):
    global resource
    for _ in range(3):
        with condition:
            while resource is None:
                condition.wait()
            print(f'Consumer {consumer_id} got: {resource}')
            resource = None

p = threading.Thread(target=producer)
c1 = threading.Thread(target=consumer, args=(1,))
c2 = threading.Thread(target=consumer, args=(2,))

p.start()
c1.start()
c2.start()

p.join()
c1.join()
c2.join()

8. Semaphore (Семафор)

Ограничение количества одновременно работающих потоков

import threading
import time

semaphore = threading.Semaphore(3)  # Max 3 одновременно

def worker(worker_id):
    with semaphore:
        print(f'Worker {worker_id} started')
        time.sleep(2)
        print(f'Worker {worker_id} finished')

threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

9. Barrier (Барьер)

Синхронизация нескольких потоков в точке

import threading

barrier = threading.Barrier(3)  # Ждёт 3 потока

def worker(worker_id):
    print(f'Worker {worker_id} waiting at barrier')
    barrier.wait()
    print(f'Worker {worker_id} passed barrier')

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

10. Shared Memory (для процессов)

Разделение памяти между процессами

from multiprocessing import Process, Value, Array
import time

def writer(shared_int, shared_array):
    for i in range(5):
        shared_int.value = i
        for j in range(3):
            shared_array[j] = i * j
        time.sleep(0.5)

def reader(shared_int, shared_array):
    for _ in range(5):
        print(f'Int: {shared_int.value}, Array: {list(shared_array)}')
        time.sleep(0.5)

if __name__ == '__main__':
    shared_int = Value('i', 0)
    shared_array = Array('i', 3)
    
    p1 = Process(target=writer, args=(shared_int, shared_array))
    p2 = Process(target=reader, args=(shared_int, shared_array))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

11. Message Bus (шина сообщений)

Паттерн pub/sub

class MessageBus:
    def __init__(self):
        self.subscribers = {}
    
    def subscribe(self, topic, callback):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(callback)
    
    def publish(self, topic, message):
        if topic in self.subscribers:
            for callback in self.subscribers[topic]:
                callback(message)

bus = MessageBus()

def on_user_created(user):
    print(f'User created: {user}')

def send_welcome_email(user):
    print(f'Sending email to {user}')

bus.subscribe('user:created', on_user_created)
bus.subscribe('user:created', send_welcome_email)

bus.publish('user:created', 'john@example.com')

12. Сравнительная таблица

МеханизмПотокиПроцессыКорутиныПростотаСкорость
QueueHighMedium
PipeHighHigh
LockMediumHigh
EventHighHigh
ConditionMediumHigh
SemaphoreHighHigh
Shared MemoryLowHigh
Message BusMediumMedium

13. Выбор механизма

Для потоков: Queue (простой обмен), Lock (синхронизация) Для процессов: Queue, Pipe, Shared Memory Для корутин: asyncio.Queue, Event, Condition Для микросервисов: Message Bus (Redis, RabbitMQ)

Каждый механизм решает конкретную задачу — выбирай в зависимости от архитектуры!