Как можно синхронизировать объекты в многопроцессорной среде в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Синхронизация объектов в многопроцессорной среде Python
В многопроцессорной среде каждый процесс имеет собственное пространство памяти, поэтому обычные переменные Python не могут быть общими. Рассмотрю основные подходы к синхронизации.
Проблема: разные адреса памяти
Вот почему простое присваивание не работает:
# ❌ Не работает: каждый процесс имеет свою копию counter
counter = 0
def increment():
global counter
counter += 1
if __name__ == "__main__":
with multiprocessing.Pool(4) as pool:
pool.map(increment, range(100))
print(counter) # Выведет 0, не 100!
Каждый процесс работает с собственной копией counter.
Способ 1: multiprocessing.Value и Array
Для простых типов данных:
from multiprocessing import Process, Value, Lock
def increment(counter, lock):
with lock:
# lock гарантирует, что только один процесс может писать
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0) # 'i' = integer
lock = Lock()
processes = []
for _ in range(4):
p = Process(target=increment, args=(counter, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}") # Выведет 4
Поддерживаемые типы:
'i'— int'd'— double'f'— float'b'— bool
Для массивов:
from multiprocessing import Array
def update_array(shared_array, index, value):
shared_array[index] = value
if __name__ == "__main__":
shared = Array('i', [0, 0, 0, 0]) # Массив из 4 целых чисел
processes = []
for i in range(4):
p = Process(target=update_array, args=(shared, i, i * 10))
processes.append(p)
p.start()
for p in processes:
p.join()
print(list(shared)) # [0, 10, 20, 30]
Способ 2: multiprocessing.Manager
Для сложных типов (dict, list, объекты):
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list, worker_id):
# Добавляем в общий словарь
shared_dict[f"worker_{worker_id}"] = worker_id * 100
# Добавляем в общий список
shared_list.append(worker_id)
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = []
for i in range(4):
p = Process(target=worker, args=(shared_dict, shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Dict: {dict(shared_dict)}")
print(f"List: {list(shared_list)}")
Manager поддерживает:
- dict, list, tuple
- Queue, Lock, Semaphore, Event
- Namespace (для объектов)
Способ 3: Queue для обмена данными
Для асинхронного обмена между процессами:
from multiprocessing import Process, Queue
def producer(queue):
"""Процесс, который производит данные."""
for i in range(5):
print(f"Producing {i}")
queue.put(i)
time.sleep(0.1)
def consumer(queue):
"""Процесс, который потребляет данные."""
while True:
item = queue.get()
if item is None: # Сигнал конца
break
print(f"Consuming {item}")
if __name__ == "__main__":
queue = Queue()
p_producer = Process(target=producer, args=(queue,))
p_consumer = Process(target=consumer, args=(queue,))
p_producer.start()
p_consumer.start()
p_producer.join()
queue.put(None) # Сигнал конца
p_consumer.join()
Способ 4: Lock и RLock
Для критических секций кода:
from multiprocessing import Process, Lock
balance = Value('f', 1000.0)
lock = Lock()
def withdraw(amount):
"""Снять деньги со счёта (критическая секция)."""
global balance
with lock: # Только один процесс может выполнять
if balance.value >= amount:
print(f"Withdraw {amount}")
balance.value -= amount
else:
print("Insufficient funds")
if __name__ == "__main__":
processes = []
for _ in range(5):
p = Process(target=withdraw, args=(100,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final balance: {balance.value}") # 500
Способ 5: Semaphore для ограничения доступа
Когда нужно разрешить N процессам одновременный доступ:
from multiprocessing import Process, Semaphore
import time
def access_resource(semaphore, resource_id):
with semaphore: # Макс 2 одновременно
print(f"Resource {resource_id} is using resource")
time.sleep(1)
print(f"Resource {resource_id} released")
if __name__ == "__main__":
semaphore = Semaphore(2) # Макс 2 одновременно
processes = []
for i in range(6):
p = Process(target=access_resource, args=(semaphore, i))
processes.append(p)
p.start()
for p in processes:
p.join()
Способ 6: Event для сигнализации
Для синхронизации событий между процессами:
from multiprocessing import Process, Event
import time
def worker(event, worker_id):
print(f"Worker {worker_id} waiting...")
event.wait() # Ждёт сигнала
print(f"Worker {worker_id} activated!")
def coordinator(event, delay):
time.sleep(delay)
print("Coordinator: sending signal")
event.set() # Сигнал всем
if __name__ == "__main__":
event = Event()
# Запускаем работников
processes = []
for i in range(3):
p = Process(target=worker, args=(event, i))
processes.append(p)
p.start()
# Запускаем координатор
p_coord = Process(target=coordinator, args=(event, 2))
p_coord.start()
for p in processes + [p_coord]:
p.join()
Способ 7: Pipe для двусторонней коммуникации
Для обмена данными между двумя процессами:
from multiprocessing import Process, Pipe
def child_process(conn):
conn.send("Hello from child")
message = conn.recv()
print(f"Child received: {message}")
conn.close()
def parent_process(conn):
message = conn.recv()
print(f"Parent received: {message}")
conn.send("Hello from parent")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p_child = Process(target=child_process, args=(child_conn,))
p_parent = Process(target=parent_process, args=(parent_conn,))
p_child.start()
p_parent.start()
p_child.join()
p_parent.join()
Сравнение методов
| Метод | Использование | Производительность | Сложность |
|---|---|---|---|
| Value | Простые типы | Высокая | Низкая |
| Array | Числовые массивы | Высокая | Низкая |
| Manager | Сложные объекты | Средняя | Средняя |
| Queue | Асинхронный обмен | Средняя | Низкая |
| Lock | Критические секции | Низкая | Средняя |
| Semaphore | Ограничение доступа | Низкая | Средняя |
| Event | Синхронизация событий | Средняя | Низкая |
| Pipe | Двусторонняя связь | Высокая | Низкая |
Лучшие практики
# ✅ Правильно: используй контекстные менеджеры
with lock:
shared_value.value += 1
# ❌ Не правильно: ручное управление
lock.acquire()
shared_value.value += 1
lock.release() # Может не выполниться при исключении
# ✅ Правильно: Manager с with
with Manager() as manager:
data = manager.dict()
# работаем с data
# ✅ Правильно: проверяй deadlock
# Если процесс ждёт два lock в разном порядке — deadlock
# ❌ Избегай использования глобальных переменных
# Они копируются, не синхронизируются
Выбор метода синхронизации зависит от типа данных, частоты доступа и архитектуры приложения.