← Назад к вопросам

Как параллельно работать в CPU bound задачами в Python?

2.0 Middle🔥 161 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Параллельная работа с CPU-bound задачами в Python

Для CPU-bound операций в Python существует несколько подходов. Давайте разберёмся в каждом, включая их достоинства и недостатки.

1. multiprocessing — классический подход

multiprocessing создаёт отдельные процессы с независимыми интерпретаторами Python, полностью обходя GIL:

from multiprocessing import Pool, cpu_count
import time

def compute_task(n):
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Pool с количеством рабочих = количество ядер
if __name__ == "__main__":
    with Pool(cpu_count()) as pool:
        results = pool.map(compute_task, [100_000_000] * 4)
        print(results)

Достоинства:

  • Истинный параллелизм на многоядерных системах
  • Простой API

Недостатки:

  • Высокие накладные расходы на создание процессов
  • Inter-process communication медленнее, чем between-thread
  • Нельзя использовать с неп сериализуемыми объектами

2. concurrent.futures.ProcessPoolExecutor

Оболочка над multiprocessing с более удобным API:

from concurrent.futures import ProcessPoolExecutor
import time

def compute_task(n):
    return sum(i ** 2 for i in range(n))

start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(compute_task, 100_000_000) for _ in range(4)]
    results = [f.result() for f in futures]

print(f"Время: {time.time() - start:.2f}s")

Преимущество: Future API удобнее для управления результатами и обработки исключений.

3. asyncio + CPU-offloading

Для асинхронных приложений можно отправить CPU-задачу в executor:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_task(n):
    return sum(i ** 2 for i in range(n))

async def main():
    loop = asyncio.get_event_loop()
    
    # Запустить блокирующую задачу в отдельном потоке/процессе
    result = await loop.run_in_executor(None, cpu_task, 100_000_000)
    print(f"Результат: {result}")

asyncio.run(main())

4. C-расширения и Cython

Для критичного по производительности кода можно использовать Cython или чистый C:

# Cython код (вычисляется до компиляции)
# cython: boundscheck=False, wraparound=False, cdivision=True

def compute_cython(int n):
    cdef int i, total = 0
    for i in range(n):
        total += i ** 2
    return total

Преимущества: Скорость C + удобство Python синтаксиса. Может отпускать GIL явно.

5. NumPy и векторизация

Для численных вычислений NumPy работает параллельно благодаря BLAS/LAPACK:

import numpy as np

# NumPy операции не блокируются GIL
arr = np.arange(100_000_000)
result = np.sum(arr ** 2)  # Параллельно на нескольких ядрах

Сравнение производительности

import time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
import numpy as np

def compute_task(n):
    return sum(i ** 2 for i in range(n))

n = 100_000_000
task_count = 4

# 1. Последовательно (медленно)
start = time.time()
for _ in range(task_count):
    compute_task(n)
print(f"Последовательно: {time.time() - start:.2f}s")

# 2. multiprocessing.Pool (быстро)
start = time.time()
with Pool(4) as pool:
    pool.map(compute_task, [n] * task_count)
print(f"multiprocessing.Pool: {time.time() - start:.2f}s")

# 3. NumPy (самый быстрый)
start = time.time()
arr = np.arange(n)
for _ in range(task_count):
    np.sum(arr ** 2)
print(f"NumPy: {time.time() - start:.2f}s")

Рекомендации по выбору

ЗадачаРешениеПочему
Численные вычисленияNumPy/SciPyВстроенный параллелизм BLAS
Batch обработкаmultiprocessing.PoolПростой API, хорошо масштабируется
Асинхронные приложенияasyncio + executorНе блокирует event loop
Критичная производительностьCython/NumPyC скорость без GIL
Распределённые вычисленияRay/Dask/SparkМасштабирование на несколько машин

Важный момент: IPC overhead

Одна из основных проблем multiprocessing — это Inter-Process Communication (IPC). Передача больших объектов между процессами медленнее, чем между потоками:

# Медленно: большой объект передаётся через IPC
data = [i for i in range(10_000_000)]  # ~40MB
with Pool(4) as pool:
    results = pool.map(process_data, [data] * 4)  # 4 копии через IPC

# Быстрее: генерируем данные внутри worker процесса
with Pool(4) as pool:
    results = pool.map(lambda x: process_data_with_seed(x), range(4))

Итог

Для CPU-bound задач в Python нужно выходить за пределы threading:

  1. multiprocessing — базовое решение для параллелизма
  2. NumPy — если работаешь с численными данными
  3. Cython — если нужна максимальная производительность
  4. asyncio + executor — если строишь асинхронное приложение