← Назад к вопросам
Какие инструменты применял для многопроцессорности (multiprocessing)?
2.0 Middle🔥 111 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Инструменты для многопроцессорности в Python
Многопроцессорность (multiprocessing) — это создание отдельных процессов (не потоков) для параллельного выполнения CPU-intensive задач. Каждый процесс имеет собственный интерпретатор Python и обходит GIL.
1. Встроенный модуль multiprocessing
Это базовый модуль Python для работы с процессами:
from multiprocessing import Process, Pool
import os
# Простой процесс
def worker(name):
print(f"Процесс {name} (PID: {os.getpid()}) работает")
if __name__ == "__main__":
p = Process(target=worker, args=("P1",))
p.start()
p.join()
print("Готово")
Pool для параллельной работы:
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # [1, 4, 9, 16, 25]
Очереди (Queue) для коммуникации между процессами:
from multiprocessing import Process, Queue
def producer(queue):
for i in range(5):
queue.put(f"Item {i}")
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Получен: {item}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None) # Сигнал завершения
p2.join()
2. concurrent.futures (высокоуровневый API)
Проще и удобнее multiprocessing для большинства случаев:
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def cpu_task(n):
total = 0
for i in range(n):
total += i
return total
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
# map() — выполняет функцию для каждого элемента
results = executor.map(cpu_task, [10_000_000] * 4)
print(list(results))
# submit() + as_completed() для более гибкого контроля
futures = [executor.submit(cpu_task, 10_000_000) for _ in range(4)]
for future in as_completed(futures):
print(f"Результат: {future.result()}")
Преимущества concurrent.futures:
- Простой интерфейс (как для потоков, так и для процессов)
- Автоматическое управление процессами
- Удобная обработка результатов
3. Joblib (для ML и параллельных вычислений)
Очень популярна в data science и ML:
from joblib import Parallel, delayed
def expensive_function(x):
return x ** 2
# Параллельное выполнение
results = Parallel(n_jobs=4)(
delayed(expensive_function)(i) for i in range(10)
)
print(results) # [0, 1, 4, 9, 16, ...]
Joblib часто используется с scikit-learn:
from sklearn.ensemble import RandomForestClassifier
# n_jobs=-1 означает использовать все процессоры
rf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
rf.fit(X_train, y_train)
4. Celery (для распределённой обработки)
Для асинхронных задач в web приложениях:
from celery import Celery
app = Celery("myapp")
@app.task
def slow_task(x):
# Может выполняться на другом процессе/машине
return x ** 2
# В web приложении
from django.http import HttpResponse
def trigger_task(request):
task = slow_task.delay(5) # Запуск асинхронно
return HttpResponse(f"Task ID: {task.id}")
# Получение результата
result = slow_task.AsyncResult(task_id).get()
Celery требует message broker (Redis, RabbitMQ):
# celery.py
from celery import Celery
app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
5. Ray (для масштабируемых вычислений)
Современный фреймворк для распределённых вычислений:
import ray
ray.init(num_cpus=4)
@ray.remote
def square(x):
return x ** 2
# Параллельное выполнение
futures = [square.remote(i) for i in range(10)]
results = ray.get(futures)
print(results) # [0, 1, 4, 9, 16, ...]
ray.shutdown()
Ray для ML pipeline:
import ray
from ray import tune
# Распределённое обучение
tune.run(
"PPO",
config={
"env": "CartPole-v0",
"num_workers": 4,
}
)
6. MPL (для простых многопроцессорных скриптов)
Лёгкий wrapper над multiprocessing:
from mpl import Pool
def task(x):
return x * 2
if __name__ == "__main__":
with Pool(4) as p:
results = p.map(task, [1, 2, 3, 4, 5])
print(results)
Сравнение подходов
| Инструмент | Случай использования | Сложность |
|---|---|---|
| multiprocessing | Базовая параллельная обработка | Средняя |
| concurrent.futures | Простые CPU-задачи | Низкая |
| joblib | ML, data science, scikit-learn | Низкая |
| Celery | Web приложения, очереди задач | Высокая |
| Ray | Масштабируемые распределённые вычисления | Средняя-Высокая |
Практический пример: скачивание файлов параллельно
from concurrent.futures import ProcessPoolExecutor
import requests
def download_file(url):
try:
response = requests.get(url, timeout=5)
return f"{url}: {len(response.content)} bytes"
except Exception as e:
return f"{url}: Error - {e}"
urls = [
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
]
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(download_file, urls)
for result in results:
print(result)
Выводы
- multiprocessing — встроённый модуль, полный контроль
- concurrent.futures — простой API для CPU-bound задач
- joblib — лучший выбор для ML и data science
- Celery — для асинхронных задач в web приложениях
- Ray — для масштабируемых распределённых систем
- Выбор зависит от задачи — начни с concurrent.futures, затем переходи к более сложным инструментам