← Назад к вопросам
Почему для вычислительных задач в Python лучше использовать многопроцессорность?
2.2 Middle🔥 161 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Многопроцессорность в Python для вычислительных задач
Суть проблемы: Global Interpreter Lock (GIL)
Python имеет Global Interpreter Lock (GIL) — это мьютекс в интерпретаторе CPython, который позволяет только одному потоку выполнять код Python одновременно, даже если на компьютере несколько ядер процессора.
┌─────────────────────────────────────────┐
│ CPython Interpreter (GIL) │
│ ┌─────────────────────────────────────┐│
│ │ Только ОДИН поток может выполнять ││
│ │ Python код одновременно ││
│ │ (даже на 4-ядерном процессоре) ││
│ └─────────────────────────────────────┘│
└─────────────────────────────────────────┘
Проблема: многопоточность НЕ ускоряет вычисления
import threading
import time
def cpu_intensive_task(n):
"""Вычисляем сумму квадратов (CPU-bound задача)"""
total = 0
for i in range(n):
total += i * i
return total
# Тест 1: Однопоточное выполнение
start = time.time()
result1 = cpu_intensive_task(100_000_000)
result2 = cpu_intensive_task(100_000_000)
single_thread_time = time.time() - start
print(f"Однопоточное: {single_thread_time:.2f} сек") # ~4 сек
# Тест 2: Многопоточное выполнение (ошибочный подход)
start = time.time()
threads = []
for _ in range(2):
t = threading.Thread(target=cpu_intensive_task, args=(100_000_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
multi_thread_time = time.time() - start
print(f"Многопоточное: {multi_thread_time:.2f} сек") # ~8 сек МЕДЛЕННЕЕ!
# Многопоточность ЗАМЕДЛЯЕТ, потому что GIL + переключение контекста!
Почему это происходит
1. GIL блокирует параллельное выполнение
Однопоточное выполнение:
Поток A: [===============Task 1===============] [===============Task 2===============]
Время: 0 4 8 сек
Многопоточное выполнение с GIL:
Поток A: [==Task 1==] (yield) [==Task 1==] (yield) [==Task 1==]
Поток B: (wait) [==Task 2==] (yield) [==Task 2==] (yield) [==Task 2==]
GIL: ================================================= (держит оба потока)
Время: 0 8 сек
Есть overhead на переключение контекста!
2. Переключение контекста имеет стоимость
import threading
import time
def measure_context_switch():
"""Измеряем overhead контекст-свитча"""
# Без контекст-свитча (1 поток)
start = time.perf_counter()
for _ in range(1_000_000):
pass
no_switch = time.perf_counter() - start
# С контекст-свитчем (2 потока)
counter = 0
lock = threading.Lock()
def worker():
nonlocal counter
for _ in range(500_000):
with lock: # Много переключений
counter += 1
threads = [threading.Thread(target=worker) for _ in range(2)]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
with_switch = time.perf_counter() - start
print(f"Без свитча: {no_switch:.4f}c")
print(f"С свитчем: {with_switch:.4f}c")
print(f"Overhead: {with_switch / no_switch:.1f}x медленнее")
measure_context_switch()
# Без свитча: 0.0023c
# С свитчем: 2.3456c
# Overhead: 1000x медленнее!
Решение: многопроцессорность вместо многопоточности
Многопроцессорность создаёт отдельные Python процессы, каждый со своим GIL. Это позволяет истинному параллелизму на многоядерных системах.
from multiprocessing import Pool
import time
def cpu_intensive_task(n):
"""Вычисляем сумму квадратов"""
total = 0
for i in range(n):
total += i * i
return total
# Тест 1: Однопроцессное
start = time.time()
result1 = cpu_intensive_task(100_000_000)
result2 = cpu_intensive_task(100_000_000)
single_process_time = time.time() - start
print(f"Однопроцессное: {single_process_time:.2f} сек") # ~4 сек
# Тест 2: Многопроцессное (правильный подход)
start = time.time()
with Pool(processes=2) as pool:
# Каждый процесс имеет свой GIL - истинный параллелизм!
results = pool.map(cpu_intensive_task, [100_000_000, 100_000_000])
multi_process_time = time.time() - start
print(f"Многопроцессное (2 ядра): {multi_process_time:.2f} сек") # ~2 сек БЫСТРЕЕ!
print(f"Ускорение: {single_process_time / multi_process_time:.1f}x")
Когда использовать многопроцессорность
1. CPU-bound задачи (вычисления)
from multiprocessing import Pool
import hashlib
def calculate_hash(data):
"""CPU-bound: много вычислений"""
return hashlib.sha256(data.encode()).hexdigest()
# Генерируем много данных
data_list = [f"data_{i}" for i in range(1000)]
# Многопроцессное выполнение
with Pool(processes=4) as pool: # На 4-ядерном процессоре
results = pool.map(calculate_hash, data_list)
# Ускорение ~4x на 4-ядерном процессоре
2. Обработка больших файлов
from multiprocessing import Pool
import json
def process_line(line):
"""Обрабатываем каждую строку (CPU-bound)"""
data = json.loads(line)
# Сложная обработка
data['processed'] = data['value'] * 2
return data
def process_file_parallel(filename, num_processes=4):
"""Читаем файл и обрабатываем параллельно"""
with open(filename, 'r') as f:
lines = f.readlines()
with Pool(processes=num_processes) as pool:
# Распределяем работу между процессами
results = pool.map(process_line, lines)
return results
# Использование
results = process_file_parallel('large_file.jsonl', num_processes=8)
3. Batch обработка данных
from multiprocessing import Pool
import numpy as np
def calculate_statistics(data):
"""CPU-bound: расчёт статистики"""
return {
'mean': np.mean(data),
'std': np.std(data),
'min': np.min(data),
'max': np.max(data)
}
# Генерируем батчи данных
batches = [np.random.randn(1000000) for _ in range(10)]
# Параллельная обработка
with Pool(processes=4) as pool:
results = pool.map(calculate_statistics, batches)
# Результат: статистика для каждого батча
Когда использовать многопоточность (I/O-bound)
Многопоточность хороша для I/O-bound задач (ввод-вывод):
import threading
import requests
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_url(url):
"""I/O-bound: ожидание сетевого ответа"""
response = requests.get(url)
return len(response.content)
urls = [
'https://example.com',
'https://google.com',
'https://github.com',
# ... ещё 100 URL
]
# Для I/O-bound многопоточность УСКОРЯЕТ!
# Пока один поток ждёт сетевого ответа, другой может работать
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(fetch_url, urls)
# Ускорение ~10x потому что потоки работают параллельно во время ожидания I/O
Практический пример: гибридный подход
from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor
import requests
import json
def fetch_and_process(url):
"""Комбо: I/O-bound (fetch) + CPU-bound (process)"""
# I/O-bound: получаем данные
response = requests.get(url)
data = response.json()
# CPU-bound: обрабатываем
processed = {}
for key, value in data.items():
if isinstance(value, (int, float)):
processed[key] = value * 2
return json.dumps(processed)
# Стратегия:
# 1. ThreadPool для I/O (сетевые запросы)
# 2. ProcessPool для CPU (обработка)
urls = ['https://api.example.com/data/' + str(i) for i in range(100)]
# Сначала параллельно получаем данные (I/O)
with ThreadPoolExecutor(max_workers=20) as executor:
responses = list(executor.map(fetch_url, urls))
# Потом параллельно обрабатываем (CPU)
with Pool(processes=4) as pool:
results = pool.map(process_data, responses)
Сравнение производительности
Задача: 4 блока вычислений по 1 сек каждый
Однопоточное: [====][====][====][====] 4 сек
Многопоточное (GIL): [==][==][==][==][==][==] 4+ сек (медленнее из-за свитча)
Многопроцессное: [========]
[========] 1 сек (на 4-ядерном)
[========]
[========]
Когда НЕ использовать многопроцессорность
- Малые вычисления — overhead создания процесса больше, чем экономия
- I/O-bound задачи — используй asyncio или ThreadPoolExecutor
- Нужен общий state — процессы не делят память, нужна IPC (Inter-Process Communication)
# ❌ Плохо: много overhead для малых вычислений
with Pool() as pool:
results = pool.map(lambda x: x * 2, range(10)) # Медленнее чем обычный map
# ✅ Хорошо: значимые вычисления
with Pool() as pool:
results = pool.map(complicated_math_function, million_numbers)
Когда нужна общая память между процессами
from multiprocessing import Pool, Queue, Process
def worker(queue, value):
"""Процесс, который отправляет результат в queue"""
result = value * 2
queue.put(result)
# Используем Queue для IPC (Inter-Process Communication)
q = Queue()
processes = []
for i in range(4):
p = Process(target=worker, args=(q, i))
p.start()
processes.append(p)
# Собираем результаты
results = []
for _ in range(4):
results.append(q.get())
for p in processes:
p.join()
print(results) # [0, 2, 4, 6]
Альтернатива: Cython или NumPy
GIL можно обойти, используя специализированные библиотеки:
import numpy as np
# NumPy операции автоматически отпускают GIL!
# Это быстрее, чем многопроцессорность
arr = np.arange(100_000_000)
result = np.sum(arr * arr) # Быстро, без многопроцессорности!
Заключение
Используй многопроцессорность для CPU-bound задач, потому что:
- GIL ограничивает многопоточность — только один поток выполняет Python код
- Многопроцессорность = истинный параллелизм — каждый процесс имеет свой GIL
- Масштабируется с ядрами — на 4-ядерном процессоре ~4x ускорение
- Простая в использовании — Pool.map() в 3 строки кода
Для I/O-bound используй многопоточность или asyncio вместо многопроцессорности.