← Назад к вопросам

Чем запускать параллельно CPU bound задачи в Python?

2.0 Middle🔥 101 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Параллельное выполнение CPU-bound задач в Python

Это критический вопрос для высоконагруженных приложений. В Python есть несколько подходов, и нужно выбрать правильный для своей задачи.

Проблема: Global Interpreter Lock (GIL)

В CPython есть GIL — механизм, который позволяет выполняться только одному потоку в один момент времени. Это означает:

  • Threading неэффективна для CPU-bound задач — потоки не выполняются одновременно
  • Multiprocessing работает, потому что каждый процесс имеет свой интерпретатор
  • Async не помогает для CPU-bound (она для I/O-bound)

Решение 1: multiprocessing (классический подход)

Для CPU-bound задач используем отдельные процессы:

from multiprocessing import Pool
import time

def cpu_bound_task(n):
    """Тяжелая вычислительная задача"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Использование Pool
if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # map применяет функцию к каждому элементу
        results = pool.map(cpu_bound_task, [10_000_000, 20_000_000, 15_000_000])
        print(results)

Решение 2: concurrent.futures.ProcessPoolExecutor (современный подход)

Более гибкое и удобное API:

from concurrent.futures import ProcessPoolExecutor
import time

def cpu_bound_task(n):
    return sum(i ** 2 for i in range(n))

# Способ 1: map (для простых случаев)
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_bound_task, [10_000_000, 20_000_000]))
    print(results)

# Способ 2: submit (для более сложных сценариев)
with ProcessPoolExecutor(max_workers=4) as executor:
    futures = {
        executor.submit(cpu_bound_task, n): n 
        for n in [10_000_000, 20_000_000, 15_000_000]
    }
    
    for future in futures:
        try:
            result = future.result(timeout=30)
            print(f"Result: {result}")
        except Exception as e:
            print(f"Error: {e}")

Решение 3: joblib (для machine learning)

Отличный инструмент, особенно для ML задач:

from joblib import Parallel, delayed

def cpu_bound_task(n):
    return sum(i ** 2 for i in range(n))

# n_jobs=-1 использует все ядра
results = Parallel(n_jobs=-1)(
    delayed(cpu_bound_task)(n) 
    for n in [10_000_000, 20_000_000, 15_000_000]
)
print(results)

Сравнение подходов

ПодходИспользованиеПлюсыМинусы
multiprocessing.PoolПростые параллельные задачиСтандартная библиотека, стабильнаСтарое API, сложнее обработка ошибок
ProcessPoolExecutorБольшинство случаевСовременное API, гибкость, обработка ошибокНебольшой оверхед
joblibML, heavy computationОптимизирована для CPU-bound, прогресс-барТретья библиотека
asyncio❌ НЕ для CPU-boundДля I/O-bound толькоНе поможет с GIL
threading❌ НЕ для CPU-boundДля I/O-bound толькоGIL блокирует параллелизм

Реальный пример: обработка больших данных

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from typing import List

def process_chunk(chunk: List[int]) -> int:
    """Обработка одного куска данных"""
    time.sleep(1)  # имитация тяжелого вычисления
    return sum(i ** 2 for i in chunk)

def process_data_parallel(data: List[int], chunk_size: int = 1000) -> int:
    # Разбиваем данные на части
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
    
    results = []
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Отправляем все задачи
        futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
        
        # Обработаем результаты по мере готовности
        for future in as_completed(futures):
            try:
                result = future.result(timeout=60)
                results.append(result)
            except TimeoutError:
                print("Task timeout")
            except Exception as e:
                print(f"Task failed: {e}")
    
    return sum(results)

# Использование
data = list(range(100_000))
result = process_data_parallel(data)
print(f"Total: {result}")

Оптимизация: сколько процессов выбрать?

import os
from concurrent.futures import ProcessPoolExecutor

# Количество CPU ядер
num_cores = os.cpu_count()
print(f"CPU cores: {num_cores}")

# Рекомендации:
# - Для чистого CPU-bound: num_cores
# - Для I/O + CPU: num_cores + 2
# - Для сетевых операций: больше

with ProcessPoolExecutor(max_workers=num_cores) as executor:
    # ...

Python 3.13+ улучшения

В новых версиях Python есть попытки улучшить ситуацию с GIL:

# Экспериментальная функция для снятия GIL
import sys
if hasattr(sys, 'free_threading'):
    print("Free-threading Python detected")

Когда НЕ использовать multiprocessing

  • Для I/O-bound задач — используй asyncio или threading
  • Для простых операций — оверхед создания процесса больше, чем польза
  • Когда нужна общая память между задачами — используй threading

Итоговые рекомендации

  1. Для большинства CPU-bound задач: ProcessPoolExecutor
  2. Для ML/Data Science: joblib
  3. Для простого случая с быстрой обработкой: multiprocessing.Pool
  4. Если много I/O: asyncio или threading
  5. Если нужна гибкость: ProcessPoolExecutor с as_completed()