Как параллельно работать в CPU bound задачами в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Параллельная работа с CPU-bound задачами в Python
Для CPU-bound операций в Python существует несколько подходов. Давайте разберёмся в каждом, включая их достоинства и недостатки.
1. multiprocessing — классический подход
multiprocessing создаёт отдельные процессы с независимыми интерпретаторами Python, полностью обходя GIL:
from multiprocessing import Pool, cpu_count
import time
def compute_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
# Pool с количеством рабочих = количество ядер
if __name__ == "__main__":
with Pool(cpu_count()) as pool:
results = pool.map(compute_task, [100_000_000] * 4)
print(results)
Достоинства:
- Истинный параллелизм на многоядерных системах
- Простой API
Недостатки:
- Высокие накладные расходы на создание процессов
- Inter-process communication медленнее, чем between-thread
- Нельзя использовать с неп сериализуемыми объектами
2. concurrent.futures.ProcessPoolExecutor
Оболочка над multiprocessing с более удобным API:
from concurrent.futures import ProcessPoolExecutor
import time
def compute_task(n):
return sum(i ** 2 for i in range(n))
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(compute_task, 100_000_000) for _ in range(4)]
results = [f.result() for f in futures]
print(f"Время: {time.time() - start:.2f}s")
Преимущество: Future API удобнее для управления результатами и обработки исключений.
3. asyncio + CPU-offloading
Для асинхронных приложений можно отправить CPU-задачу в executor:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_task(n):
return sum(i ** 2 for i in range(n))
async def main():
loop = asyncio.get_event_loop()
# Запустить блокирующую задачу в отдельном потоке/процессе
result = await loop.run_in_executor(None, cpu_task, 100_000_000)
print(f"Результат: {result}")
asyncio.run(main())
4. C-расширения и Cython
Для критичного по производительности кода можно использовать Cython или чистый C:
# Cython код (вычисляется до компиляции)
# cython: boundscheck=False, wraparound=False, cdivision=True
def compute_cython(int n):
cdef int i, total = 0
for i in range(n):
total += i ** 2
return total
Преимущества: Скорость C + удобство Python синтаксиса. Может отпускать GIL явно.
5. NumPy и векторизация
Для численных вычислений NumPy работает параллельно благодаря BLAS/LAPACK:
import numpy as np
# NumPy операции не блокируются GIL
arr = np.arange(100_000_000)
result = np.sum(arr ** 2) # Параллельно на нескольких ядрах
Сравнение производительности
import time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
import numpy as np
def compute_task(n):
return sum(i ** 2 for i in range(n))
n = 100_000_000
task_count = 4
# 1. Последовательно (медленно)
start = time.time()
for _ in range(task_count):
compute_task(n)
print(f"Последовательно: {time.time() - start:.2f}s")
# 2. multiprocessing.Pool (быстро)
start = time.time()
with Pool(4) as pool:
pool.map(compute_task, [n] * task_count)
print(f"multiprocessing.Pool: {time.time() - start:.2f}s")
# 3. NumPy (самый быстрый)
start = time.time()
arr = np.arange(n)
for _ in range(task_count):
np.sum(arr ** 2)
print(f"NumPy: {time.time() - start:.2f}s")
Рекомендации по выбору
| Задача | Решение | Почему |
|---|---|---|
| Численные вычисления | NumPy/SciPy | Встроенный параллелизм BLAS |
| Batch обработка | multiprocessing.Pool | Простой API, хорошо масштабируется |
| Асинхронные приложения | asyncio + executor | Не блокирует event loop |
| Критичная производительность | Cython/NumPy | C скорость без GIL |
| Распределённые вычисления | Ray/Dask/Spark | Масштабирование на несколько машин |
Важный момент: IPC overhead
Одна из основных проблем multiprocessing — это Inter-Process Communication (IPC). Передача больших объектов между процессами медленнее, чем между потоками:
# Медленно: большой объект передаётся через IPC
data = [i for i in range(10_000_000)] # ~40MB
with Pool(4) as pool:
results = pool.map(process_data, [data] * 4) # 4 копии через IPC
# Быстрее: генерируем данные внутри worker процесса
with Pool(4) as pool:
results = pool.map(lambda x: process_data_with_seed(x), range(4))
Итог
Для CPU-bound задач в Python нужно выходить за пределы threading:
- multiprocessing — базовое решение для параллелизма
- NumPy — если работаешь с численными данными
- Cython — если нужна максимальная производительность
- asyncio + executor — если строишь асинхронное приложение