← Назад к вопросам
Чем запускать параллельно CPU bound задачи в Python?
2.0 Middle🔥 101 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Параллельное выполнение CPU-bound задач в Python
Это критический вопрос для высоконагруженных приложений. В Python есть несколько подходов, и нужно выбрать правильный для своей задачи.
Проблема: Global Interpreter Lock (GIL)
В CPython есть GIL — механизм, который позволяет выполняться только одному потоку в один момент времени. Это означает:
- Threading неэффективна для CPU-bound задач — потоки не выполняются одновременно
- Multiprocessing работает, потому что каждый процесс имеет свой интерпретатор
- Async не помогает для CPU-bound (она для I/O-bound)
Решение 1: multiprocessing (классический подход)
Для CPU-bound задач используем отдельные процессы:
from multiprocessing import Pool
import time
def cpu_bound_task(n):
"""Тяжелая вычислительная задача"""
total = 0
for i in range(n):
total += i ** 2
return total
# Использование Pool
if __name__ == '__main__':
with Pool(processes=4) as pool:
# map применяет функцию к каждому элементу
results = pool.map(cpu_bound_task, [10_000_000, 20_000_000, 15_000_000])
print(results)
Решение 2: concurrent.futures.ProcessPoolExecutor (современный подход)
Более гибкое и удобное API:
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_bound_task(n):
return sum(i ** 2 for i in range(n))
# Способ 1: map (для простых случаев)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task, [10_000_000, 20_000_000]))
print(results)
# Способ 2: submit (для более сложных сценариев)
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(cpu_bound_task, n): n
for n in [10_000_000, 20_000_000, 15_000_000]
}
for future in futures:
try:
result = future.result(timeout=30)
print(f"Result: {result}")
except Exception as e:
print(f"Error: {e}")
Решение 3: joblib (для machine learning)
Отличный инструмент, особенно для ML задач:
from joblib import Parallel, delayed
def cpu_bound_task(n):
return sum(i ** 2 for i in range(n))
# n_jobs=-1 использует все ядра
results = Parallel(n_jobs=-1)(
delayed(cpu_bound_task)(n)
for n in [10_000_000, 20_000_000, 15_000_000]
)
print(results)
Сравнение подходов
| Подход | Использование | Плюсы | Минусы |
|---|---|---|---|
| multiprocessing.Pool | Простые параллельные задачи | Стандартная библиотека, стабильна | Старое API, сложнее обработка ошибок |
| ProcessPoolExecutor | Большинство случаев | Современное API, гибкость, обработка ошибок | Небольшой оверхед |
| joblib | ML, heavy computation | Оптимизирована для CPU-bound, прогресс-бар | Третья библиотека |
| asyncio | ❌ НЕ для CPU-bound | Для I/O-bound только | Не поможет с GIL |
| threading | ❌ НЕ для CPU-bound | Для I/O-bound только | GIL блокирует параллелизм |
Реальный пример: обработка больших данных
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from typing import List
def process_chunk(chunk: List[int]) -> int:
"""Обработка одного куска данных"""
time.sleep(1) # имитация тяжелого вычисления
return sum(i ** 2 for i in chunk)
def process_data_parallel(data: List[int], chunk_size: int = 1000) -> int:
# Разбиваем данные на части
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
results = []
with ProcessPoolExecutor(max_workers=4) as executor:
# Отправляем все задачи
futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
# Обработаем результаты по мере готовности
for future in as_completed(futures):
try:
result = future.result(timeout=60)
results.append(result)
except TimeoutError:
print("Task timeout")
except Exception as e:
print(f"Task failed: {e}")
return sum(results)
# Использование
data = list(range(100_000))
result = process_data_parallel(data)
print(f"Total: {result}")
Оптимизация: сколько процессов выбрать?
import os
from concurrent.futures import ProcessPoolExecutor
# Количество CPU ядер
num_cores = os.cpu_count()
print(f"CPU cores: {num_cores}")
# Рекомендации:
# - Для чистого CPU-bound: num_cores
# - Для I/O + CPU: num_cores + 2
# - Для сетевых операций: больше
with ProcessPoolExecutor(max_workers=num_cores) as executor:
# ...
Python 3.13+ улучшения
В новых версиях Python есть попытки улучшить ситуацию с GIL:
# Экспериментальная функция для снятия GIL
import sys
if hasattr(sys, 'free_threading'):
print("Free-threading Python detected")
Когда НЕ использовать multiprocessing
- Для I/O-bound задач — используй asyncio или threading
- Для простых операций — оверхед создания процесса больше, чем польза
- Когда нужна общая память между задачами — используй threading
Итоговые рекомендации
- Для большинства CPU-bound задач: ProcessPoolExecutor
- Для ML/Data Science: joblib
- Для простого случая с быстрой обработкой: multiprocessing.Pool
- Если много I/O: asyncio или threading
- Если нужна гибкость: ProcessPoolExecutor с as_completed()