← Назад к вопросам

В библиотеке multiprocessing Python, с помощью чего можно обмениваться данными между процессами

2.2 Middle🔥 121 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Обмен данными между процессами в multiprocessing

Multiprocessing предоставляет несколько механизмов для обмена данными между независимыми процессами. Выбор зависит от типа данных и способа синхронизации.

1. Queue (Очередь) — основной способ

Safe queue для передачи данных между процессами.

from multiprocessing import Process, Queue

def producer(q):
    """Производитель — добавляет данные в очередь"""
    for i in range(5):
        q.put(f"Item {i}")
        print(f"Produced: Item {i}")

def consumer(q):
    """Потребитель — читает данные из очереди"""
    while True:
        item = q.get()
        if item is None:  # Signal to stop
            break
        print(f"Consumed: {item}")

# Создаём очередь
queue = Queue()

# Запускаем процессы
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))

p1.start()
p2.start()

p1.join()  # Ждём конца производителя
queue.put(None)  # Signal stop to consumer
p2.join()

print("Done")

Преимущества Queue:

  • Безопасна для использования между процессами
  • Блокирует при пустой очереди (get)
  • Блокирует при полной очереди (put)
  • Можно использовать с timeout
from multiprocessing import Queue
import time

q = Queue(maxsize=10)  # Максимум 10 элементов

# Неблокирующие операции с timeout
try:
    item = q.get(timeout=1)  # Ждём максимум 1 сек
except queue.Empty:
    print("Очередь пуста")

try:
    q.put(item, timeout=1)  # Пытаемся добавить с timeout
except queue.Full:
    print("Очередь полна")

2. Pipe (Канал) — двусторонняя связь

Двусторонний канал между двумя процессами.

from multiprocessing import Process, Pipe

def child_process(conn):
    """Дочерний процесс"""
    # Получаем сообщение
    msg = conn.recv()
    print(f"Child received: {msg}")
    
    # Отправляем ответ
    conn.send(f"Response to {msg}")
    conn.close()

def parent_process():
    """Родительский процесс"""
    # Создаём pipe: (parent_conn, child_conn)
    parent_conn, child_conn = Pipe()
    
    # Запускаем дочерний процесс
    p = Process(target=child_process, args=(child_conn,))
    p.start()
    
    # Отправляем сообщение
    parent_conn.send("Hello from parent")
    
    # Получаем ответ
    response = parent_conn.recv()
    print(f"Parent received: {response}")
    
    p.join()
    parent_conn.close()

parent_process()

Pipe vs Queue:

  • Pipe — 2 процесса (1-to-1)
  • Queue — много процессов (many-to-many)

3. Shared Memory (Общая память)

Делят память между процессами для быстрого доступа.

from multiprocessing import Process, Array, Value
import ctypes

def increment_counter(shared_value, shared_array):
    """Увеличивает счётчик и массив в общей памяти"""
    # Увеличить значение
    with shared_value.get_lock():
        shared_value.value += 1
    
    # Изменить массив
    for i in range(len(shared_array)):
        shared_array[i] *= 2

# Создаём общие переменные
shared_counter = Value("i", 0)  # "i" = int, начальное значение 0
shared_arr = Array(ctypes.c_double, [1.0, 2.0, 3.0])  # Массив double

# Запускаем 3 процесса
processes = []
for _ in range(3):
    p = Process(target=increment_counter, args=(shared_counter, shared_arr))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

print(f"Counter: {shared_counter.value}")  # 3
print(f"Array: {list(shared_arr)})

Типы данных для shared memory:

Value(typecode, initial_value)  # Одно значение
Array(typecode, initial_array)  # Массив

# Typecodes:
# 'b' = signed char
# 'B' = unsigned char
# 'i' = signed int
# 'I' = unsigned int
# 'd' = double
# 'f' = float

Синхронизация shared memory:

from multiprocessing import Lock

shared_value = Value("i", 0)
lock = Lock()

def safe_increment():
    with lock:  # Только один процесс одновременно
        shared_value.value += 1

# Без lock = race condition!

4. Manager (Менеджер) — структурированные данные

Позволяет делиться сложными объектами (dict, list, objects).

from multiprocessing import Process, Manager

def worker(shared_dict, shared_list):
    """Работник изменяет общие структуры"""
    shared_dict["key"] = "value from worker"
    shared_list.append("item from worker")
    print(f"Worker: {shared_dict}")
    print(f"Worker: {shared_list}")

# Создаём менеджер
with Manager() as manager:
    # Общие структуры данных
    shared_dict = manager.dict()
    shared_list = manager.list()
    
    # Добавляем инициальные данные
    shared_dict["initial"] = "value"
    shared_list.append("initial item")
    
    # Запускаем работника
    p = Process(target=worker, args=(shared_dict, shared_list))
    p.start()
    p.join()
    
    print(f"Main: {dict(shared_dict)}")
    print(f"Main: {list(shared_list)}")

Доступные типы в Manager:

manager.dict()      # Обычный dict
manager.list()      # Обычный list
manager.Namespace() # Объект с атрибутами
manager.Lock()      # Lock для синхронизации
manager.RLock()     # Реентерабельный lock
manager.Semaphore() # Семафор
manager.Condition() # Условие
manager.Event()     # Событие
manager.Queue()     # Queue

5. Pool.map() — параллельная обработка

Распределяет задачи между процессами в пуле.

from multiprocessing import Pool
import os

def process_item(x):
    """Обрабатывает один элемент"""
    print(f"Processing {x} in PID {os.getpid()}")
    return x * x

# Создаём пул из 4 процессов
with Pool(4) as pool:
    # Синхронное
    results = pool.map(process_item, [1, 2, 3, 4, 5, 6, 7, 8])
    print(f"Results: {results}")
    
    # Асинхронное (с callback)
    def on_complete(result):
        print(f"Task completed: {result}")
    
    async_result = pool.apply_async(process_item, (10,), callback=on_complete)
    result = async_result.get(timeout=5)

6. Сравнение способов обмена

СпособТипСинхронизацияПроизводительностьКогда использовать
QueueFIFOВстроеннаяСредняяМного процессов
PipeДвустороннийВстроеннаяБыстрая2 процесса
Shared MemoryDirectНужен LockБыстраяЧасто обновляемые данные
ManagerСложные объектыВстроеннаяМедленнаяСложные структуры
Pool.mapПараллельностьАвтоматическаяБыстраяBatch обработка

7. Практический пример: Producer-Consumer с Queue

from multiprocessing import Process, Queue
import time
import random

def producer(queue, producer_id):
    """Производитель данных"""
    for i in range(10):
        item = f"P{producer_id}-Item{i}"
        queue.put(item)
        print(f"Produced: {item}")
        time.sleep(random.uniform(0.1, 0.5))

def consumer(queue, consumer_id):
    """Потребитель данных"""
    while True:
        try:
            item = queue.get(timeout=2)
            print(f"Consumer {consumer_id} consumed: {item}")
            time.sleep(random.uniform(0.1, 0.3))
        except queue.Empty:
            print(f"Consumer {consumer_id} timeout")
            break

if __name__ == "__main__":
    queue = Queue()
    
    # Запускаем 2 производителя и 3 потребителя
    processes = []
    
    for i in range(2):
        p = Process(target=producer, args=(queue, i))
        p.start()
        processes.append(p)
    
    for i in range(3):
        p = Process(target=consumer, args=(queue, i))
        p.start()
        processes.append(p)
    
    # Ждём завершения всех
    for p in processes:
        p.join()

8. Лучшие практики

# 1. Всегда используй context managers
from multiprocessing import Pool

with Pool(4) as pool:
    results = pool.map(func, data)
# Процессы автоматически закроются

# 2. Избегай shared memory без lock
# ❌ race condition
shared_value.value += 1

# ✓ Безопасно
with lock:
    shared_value.value += 1

# 3. Используй Queue для loose coupling
q = Queue()
# Процессы не знают друг о друге

# 4. Закрывай pipes и connections
from multiprocessing import Pipe
parent, child = Pipe()
# ... use pipe
parent.close()
child.close()

Эти механизмы — основа параллельного программирования в Python. Выбирай метод в зависимости от конкретной задачи и требований к производительности.

В библиотеке multiprocessing Python, с помощью чего можно обмениваться данными между процессами | PrepBro