← Назад к вопросам
В библиотеке multiprocessing Python, с помощью чего можно обмениваться данными между процессами
2.2 Middle🔥 121 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Обмен данными между процессами в multiprocessing
Multiprocessing предоставляет несколько механизмов для обмена данными между независимыми процессами. Выбор зависит от типа данных и способа синхронизации.
1. Queue (Очередь) — основной способ
Safe queue для передачи данных между процессами.
from multiprocessing import Process, Queue
def producer(q):
"""Производитель — добавляет данные в очередь"""
for i in range(5):
q.put(f"Item {i}")
print(f"Produced: Item {i}")
def consumer(q):
"""Потребитель — читает данные из очереди"""
while True:
item = q.get()
if item is None: # Signal to stop
break
print(f"Consumed: {item}")
# Создаём очередь
queue = Queue()
# Запускаем процессы
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join() # Ждём конца производителя
queue.put(None) # Signal stop to consumer
p2.join()
print("Done")
Преимущества Queue:
- Безопасна для использования между процессами
- Блокирует при пустой очереди (get)
- Блокирует при полной очереди (put)
- Можно использовать с timeout
from multiprocessing import Queue
import time
q = Queue(maxsize=10) # Максимум 10 элементов
# Неблокирующие операции с timeout
try:
item = q.get(timeout=1) # Ждём максимум 1 сек
except queue.Empty:
print("Очередь пуста")
try:
q.put(item, timeout=1) # Пытаемся добавить с timeout
except queue.Full:
print("Очередь полна")
2. Pipe (Канал) — двусторонняя связь
Двусторонний канал между двумя процессами.
from multiprocessing import Process, Pipe
def child_process(conn):
"""Дочерний процесс"""
# Получаем сообщение
msg = conn.recv()
print(f"Child received: {msg}")
# Отправляем ответ
conn.send(f"Response to {msg}")
conn.close()
def parent_process():
"""Родительский процесс"""
# Создаём pipe: (parent_conn, child_conn)
parent_conn, child_conn = Pipe()
# Запускаем дочерний процесс
p = Process(target=child_process, args=(child_conn,))
p.start()
# Отправляем сообщение
parent_conn.send("Hello from parent")
# Получаем ответ
response = parent_conn.recv()
print(f"Parent received: {response}")
p.join()
parent_conn.close()
parent_process()
Pipe vs Queue:
- Pipe — 2 процесса (1-to-1)
- Queue — много процессов (many-to-many)
3. Shared Memory (Общая память)
Делят память между процессами для быстрого доступа.
from multiprocessing import Process, Array, Value
import ctypes
def increment_counter(shared_value, shared_array):
"""Увеличивает счётчик и массив в общей памяти"""
# Увеличить значение
with shared_value.get_lock():
shared_value.value += 1
# Изменить массив
for i in range(len(shared_array)):
shared_array[i] *= 2
# Создаём общие переменные
shared_counter = Value("i", 0) # "i" = int, начальное значение 0
shared_arr = Array(ctypes.c_double, [1.0, 2.0, 3.0]) # Массив double
# Запускаем 3 процесса
processes = []
for _ in range(3):
p = Process(target=increment_counter, args=(shared_counter, shared_arr))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Counter: {shared_counter.value}") # 3
print(f"Array: {list(shared_arr)})
Типы данных для shared memory:
Value(typecode, initial_value) # Одно значение
Array(typecode, initial_array) # Массив
# Typecodes:
# 'b' = signed char
# 'B' = unsigned char
# 'i' = signed int
# 'I' = unsigned int
# 'd' = double
# 'f' = float
Синхронизация shared memory:
from multiprocessing import Lock
shared_value = Value("i", 0)
lock = Lock()
def safe_increment():
with lock: # Только один процесс одновременно
shared_value.value += 1
# Без lock = race condition!
4. Manager (Менеджер) — структурированные данные
Позволяет делиться сложными объектами (dict, list, objects).
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list):
"""Работник изменяет общие структуры"""
shared_dict["key"] = "value from worker"
shared_list.append("item from worker")
print(f"Worker: {shared_dict}")
print(f"Worker: {shared_list}")
# Создаём менеджер
with Manager() as manager:
# Общие структуры данных
shared_dict = manager.dict()
shared_list = manager.list()
# Добавляем инициальные данные
shared_dict["initial"] = "value"
shared_list.append("initial item")
# Запускаем работника
p = Process(target=worker, args=(shared_dict, shared_list))
p.start()
p.join()
print(f"Main: {dict(shared_dict)}")
print(f"Main: {list(shared_list)}")
Доступные типы в Manager:
manager.dict() # Обычный dict
manager.list() # Обычный list
manager.Namespace() # Объект с атрибутами
manager.Lock() # Lock для синхронизации
manager.RLock() # Реентерабельный lock
manager.Semaphore() # Семафор
manager.Condition() # Условие
manager.Event() # Событие
manager.Queue() # Queue
5. Pool.map() — параллельная обработка
Распределяет задачи между процессами в пуле.
from multiprocessing import Pool
import os
def process_item(x):
"""Обрабатывает один элемент"""
print(f"Processing {x} in PID {os.getpid()}")
return x * x
# Создаём пул из 4 процессов
with Pool(4) as pool:
# Синхронное
results = pool.map(process_item, [1, 2, 3, 4, 5, 6, 7, 8])
print(f"Results: {results}")
# Асинхронное (с callback)
def on_complete(result):
print(f"Task completed: {result}")
async_result = pool.apply_async(process_item, (10,), callback=on_complete)
result = async_result.get(timeout=5)
6. Сравнение способов обмена
| Способ | Тип | Синхронизация | Производительность | Когда использовать |
|---|---|---|---|---|
| Queue | FIFO | Встроенная | Средняя | Много процессов |
| Pipe | Двусторонний | Встроенная | Быстрая | 2 процесса |
| Shared Memory | Direct | Нужен Lock | Быстрая | Часто обновляемые данные |
| Manager | Сложные объекты | Встроенная | Медленная | Сложные структуры |
| Pool.map | Параллельность | Автоматическая | Быстрая | Batch обработка |
7. Практический пример: Producer-Consumer с Queue
from multiprocessing import Process, Queue
import time
import random
def producer(queue, producer_id):
"""Производитель данных"""
for i in range(10):
item = f"P{producer_id}-Item{i}"
queue.put(item)
print(f"Produced: {item}")
time.sleep(random.uniform(0.1, 0.5))
def consumer(queue, consumer_id):
"""Потребитель данных"""
while True:
try:
item = queue.get(timeout=2)
print(f"Consumer {consumer_id} consumed: {item}")
time.sleep(random.uniform(0.1, 0.3))
except queue.Empty:
print(f"Consumer {consumer_id} timeout")
break
if __name__ == "__main__":
queue = Queue()
# Запускаем 2 производителя и 3 потребителя
processes = []
for i in range(2):
p = Process(target=producer, args=(queue, i))
p.start()
processes.append(p)
for i in range(3):
p = Process(target=consumer, args=(queue, i))
p.start()
processes.append(p)
# Ждём завершения всех
for p in processes:
p.join()
8. Лучшие практики
# 1. Всегда используй context managers
from multiprocessing import Pool
with Pool(4) as pool:
results = pool.map(func, data)
# Процессы автоматически закроются
# 2. Избегай shared memory без lock
# ❌ race condition
shared_value.value += 1
# ✓ Безопасно
with lock:
shared_value.value += 1
# 3. Используй Queue для loose coupling
q = Queue()
# Процессы не знают друг о друге
# 4. Закрывай pipes и connections
from multiprocessing import Pipe
parent, child = Pipe()
# ... use pipe
parent.close()
child.close()
Эти механизмы — основа параллельного программирования в Python. Выбирай метод в зависимости от конкретной задачи и требований к производительности.