← Назад к вопросам

Какие инструменты применял для многопроцессорности (multiprocessing)?

2.0 Middle🔥 111 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Инструменты для многопроцессорности в Python

Многопроцессорность (multiprocessing) — это создание отдельных процессов (не потоков) для параллельного выполнения CPU-intensive задач. Каждый процесс имеет собственный интерпретатор Python и обходит GIL.

1. Встроенный модуль multiprocessing

Это базовый модуль Python для работы с процессами:

from multiprocessing import Process, Pool
import os

# Простой процесс
def worker(name):
    print(f"Процесс {name} (PID: {os.getpid()}) работает")

if __name__ == "__main__":
    p = Process(target=worker, args=("P1",))
    p.start()
    p.join()
    print("Готово")

Pool для параллельной работы:

from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(square, [1, 2, 3, 4, 5])
    print(results)  # [1, 4, 9, 16, 25]

Очереди (Queue) для коммуникации между процессами:

from multiprocessing import Process, Queue

def producer(queue):
    for i in range(5):
        queue.put(f"Item {i}")

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Получен: {item}")

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    
    p1.join()
    q.put(None)  # Сигнал завершения
    p2.join()

2. concurrent.futures (высокоуровневый API)

Проще и удобнее multiprocessing для большинства случаев:

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def cpu_task(n):
    total = 0
    for i in range(n):
        total += i
    return total

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        # map() — выполняет функцию для каждого элемента
        results = executor.map(cpu_task, [10_000_000] * 4)
        print(list(results))
        
        # submit() + as_completed() для более гибкого контроля
        futures = [executor.submit(cpu_task, 10_000_000) for _ in range(4)]
        for future in as_completed(futures):
            print(f"Результат: {future.result()}")

Преимущества concurrent.futures:

  • Простой интерфейс (как для потоков, так и для процессов)
  • Автоматическое управление процессами
  • Удобная обработка результатов

3. Joblib (для ML и параллельных вычислений)

Очень популярна в data science и ML:

from joblib import Parallel, delayed

def expensive_function(x):
    return x ** 2

# Параллельное выполнение
results = Parallel(n_jobs=4)(
    delayed(expensive_function)(i) for i in range(10)
)
print(results)  # [0, 1, 4, 9, 16, ...]

Joblib часто используется с scikit-learn:

from sklearn.ensemble import RandomForestClassifier

# n_jobs=-1 означает использовать все процессоры
rf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
rf.fit(X_train, y_train)

4. Celery (для распределённой обработки)

Для асинхронных задач в web приложениях:

from celery import Celery

app = Celery("myapp")

@app.task
def slow_task(x):
    # Может выполняться на другом процессе/машине
    return x ** 2

# В web приложении
from django.http import HttpResponse

def trigger_task(request):
    task = slow_task.delay(5)  # Запуск асинхронно
    return HttpResponse(f"Task ID: {task.id}")

# Получение результата
result = slow_task.AsyncResult(task_id).get()

Celery требует message broker (Redis, RabbitMQ):

# celery.py
from celery import Celery

app = Celery(
    "myapp",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

5. Ray (для масштабируемых вычислений)

Современный фреймворк для распределённых вычислений:

import ray

ray.init(num_cpus=4)

@ray.remote
def square(x):
    return x ** 2

# Параллельное выполнение
futures = [square.remote(i) for i in range(10)]
results = ray.get(futures)
print(results)  # [0, 1, 4, 9, 16, ...]

ray.shutdown()

Ray для ML pipeline:

import ray
from ray import tune

# Распределённое обучение
tune.run(
    "PPO",
    config={
        "env": "CartPole-v0",
        "num_workers": 4,
    }
)

6. MPL (для простых многопроцессорных скриптов)

Лёгкий wrapper над multiprocessing:

from mpl import Pool

def task(x):
    return x * 2

if __name__ == "__main__":
    with Pool(4) as p:
        results = p.map(task, [1, 2, 3, 4, 5])
    print(results)

Сравнение подходов

ИнструментСлучай использованияСложность
multiprocessingБазовая параллельная обработкаСредняя
concurrent.futuresПростые CPU-задачиНизкая
joblibML, data science, scikit-learnНизкая
CeleryWeb приложения, очереди задачВысокая
RayМасштабируемые распределённые вычисленияСредняя-Высокая

Практический пример: скачивание файлов параллельно

from concurrent.futures import ProcessPoolExecutor
import requests

def download_file(url):
    try:
        response = requests.get(url, timeout=5)
        return f"{url}: {len(response.content)} bytes"
    except Exception as e:
        return f"{url}: Error - {e}"

urls = [
    "https://example.com/file1.txt",
    "https://example.com/file2.txt",
    "https://example.com/file3.txt",
]

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=3) as executor:
        results = executor.map(download_file, urls)
        for result in results:
            print(result)

Выводы

  • multiprocessing — встроённый модуль, полный контроль
  • concurrent.futures — простой API для CPU-bound задач
  • joblib — лучший выбор для ML и data science
  • Celery — для асинхронных задач в web приложениях
  • Ray — для масштабируемых распределённых систем
  • Выбор зависит от задачи — начни с concurrent.futures, затем переходи к более сложным инструментам