← Назад к вопросам

Как можно синхронизировать объекты в многопроцессорной среде в Python?

3.0 Senior🔥 111 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Синхронизация объектов в многопроцессорной среде Python

В многопроцессорной среде каждый процесс имеет собственное пространство памяти, поэтому обычные переменные Python не могут быть общими. Рассмотрю основные подходы к синхронизации.

Проблема: разные адреса памяти

Вот почему простое присваивание не работает:

# ❌ Не работает: каждый процесс имеет свою копию counter
counter = 0

def increment():
    global counter
    counter += 1

if __name__ == "__main__":
    with multiprocessing.Pool(4) as pool:
        pool.map(increment, range(100))
    
    print(counter)  # Выведет 0, не 100!

Каждый процесс работает с собственной копией counter.

Способ 1: multiprocessing.Value и Array

Для простых типов данных:

from multiprocessing import Process, Value, Lock

def increment(counter, lock):
    with lock:
        # lock гарантирует, что только один процесс может писать
        counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)  # 'i' = integer
    lock = Lock()
    
    processes = []
    for _ in range(4):
        p = Process(target=increment, args=(counter, lock))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Counter: {counter.value}")  # Выведет 4

Поддерживаемые типы:

  • 'i' — int
  • 'd' — double
  • 'f' — float
  • 'b' — bool

Для массивов:

from multiprocessing import Array

def update_array(shared_array, index, value):
    shared_array[index] = value

if __name__ == "__main__":
    shared = Array('i', [0, 0, 0, 0])  # Массив из 4 целых чисел
    
    processes = []
    for i in range(4):
        p = Process(target=update_array, args=(shared, i, i * 10))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(list(shared))  # [0, 10, 20, 30]

Способ 2: multiprocessing.Manager

Для сложных типов (dict, list, объекты):

from multiprocessing import Process, Manager

def worker(shared_dict, shared_list, worker_id):
    # Добавляем в общий словарь
    shared_dict[f"worker_{worker_id}"] = worker_id * 100
    
    # Добавляем в общий список
    shared_list.append(worker_id)

if __name__ == "__main__":
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        processes = []
        for i in range(4):
            p = Process(target=worker, args=(shared_dict, shared_list, i))
            processes.append(p)
            p.start()
        
        for p in processes:
            p.join()
        
        print(f"Dict: {dict(shared_dict)}")
        print(f"List: {list(shared_list)}")

Manager поддерживает:

  • dict, list, tuple
  • Queue, Lock, Semaphore, Event
  • Namespace (для объектов)

Способ 3: Queue для обмена данными

Для асинхронного обмена между процессами:

from multiprocessing import Process, Queue

def producer(queue):
    """Процесс, который производит данные."""
    for i in range(5):
        print(f"Producing {i}")
        queue.put(i)
        time.sleep(0.1)

def consumer(queue):
    """Процесс, который потребляет данные."""
    while True:
        item = queue.get()
        if item is None:  # Сигнал конца
            break
        print(f"Consuming {item}")

if __name__ == "__main__":
    queue = Queue()
    
    p_producer = Process(target=producer, args=(queue,))
    p_consumer = Process(target=consumer, args=(queue,))
    
    p_producer.start()
    p_consumer.start()
    
    p_producer.join()
    queue.put(None)  # Сигнал конца
    p_consumer.join()

Способ 4: Lock и RLock

Для критических секций кода:

from multiprocessing import Process, Lock

balance = Value('f', 1000.0)
lock = Lock()

def withdraw(amount):
    """Снять деньги со счёта (критическая секция)."""
    global balance
    
    with lock:  # Только один процесс может выполнять
        if balance.value >= amount:
            print(f"Withdraw {amount}")
            balance.value -= amount
        else:
            print("Insufficient funds")

if __name__ == "__main__":
    processes = []
    for _ in range(5):
        p = Process(target=withdraw, args=(100,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final balance: {balance.value}")  # 500

Способ 5: Semaphore для ограничения доступа

Когда нужно разрешить N процессам одновременный доступ:

from multiprocessing import Process, Semaphore
import time

def access_resource(semaphore, resource_id):
    with semaphore:  # Макс 2 одновременно
        print(f"Resource {resource_id} is using resource")
        time.sleep(1)
        print(f"Resource {resource_id} released")

if __name__ == "__main__":
    semaphore = Semaphore(2)  # Макс 2 одновременно
    
    processes = []
    for i in range(6):
        p = Process(target=access_resource, args=(semaphore, i))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

Способ 6: Event для сигнализации

Для синхронизации событий между процессами:

from multiprocessing import Process, Event
import time

def worker(event, worker_id):
    print(f"Worker {worker_id} waiting...")
    event.wait()  # Ждёт сигнала
    print(f"Worker {worker_id} activated!")

def coordinator(event, delay):
    time.sleep(delay)
    print("Coordinator: sending signal")
    event.set()  # Сигнал всем

if __name__ == "__main__":
    event = Event()
    
    # Запускаем работников
    processes = []
    for i in range(3):
        p = Process(target=worker, args=(event, i))
        processes.append(p)
        p.start()
    
    # Запускаем координатор
    p_coord = Process(target=coordinator, args=(event, 2))
    p_coord.start()
    
    for p in processes + [p_coord]:
        p.join()

Способ 7: Pipe для двусторонней коммуникации

Для обмена данными между двумя процессами:

from multiprocessing import Process, Pipe

def child_process(conn):
    conn.send("Hello from child")
    message = conn.recv()
    print(f"Child received: {message}")
    conn.close()

def parent_process(conn):
    message = conn.recv()
    print(f"Parent received: {message}")
    conn.send("Hello from parent")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    
    p_child = Process(target=child_process, args=(child_conn,))
    p_parent = Process(target=parent_process, args=(parent_conn,))
    
    p_child.start()
    p_parent.start()
    
    p_child.join()
    p_parent.join()

Сравнение методов

МетодИспользованиеПроизводительностьСложность
ValueПростые типыВысокаяНизкая
ArrayЧисловые массивыВысокаяНизкая
ManagerСложные объектыСредняяСредняя
QueueАсинхронный обменСредняяНизкая
LockКритические секцииНизкаяСредняя
SemaphoreОграничение доступаНизкаяСредняя
EventСинхронизация событийСредняяНизкая
PipeДвусторонняя связьВысокаяНизкая

Лучшие практики

# ✅ Правильно: используй контекстные менеджеры
with lock:
    shared_value.value += 1

# ❌ Не правильно: ручное управление
lock.acquire()
shared_value.value += 1
lock.release()  # Может не выполниться при исключении

# ✅ Правильно: Manager с with
with Manager() as manager:
    data = manager.dict()
    # работаем с data

# ✅ Правильно: проверяй deadlock
# Если процесс ждёт два lock в разном порядке — deadlock

# ❌ Избегай использования глобальных переменных
# Они копируются, не синхронизируются

Выбор метода синхронизации зависит от типа данных, частоты доступа и архитектуры приложения.

Как можно синхронизировать объекты в многопроцессорной среде в Python? | PrepBro