Для каких задач подходит многопроцессорность
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Для каких задач подходит многопроцессорность в Python
Многопроцессорность (multiprocessing) - это инструмент параллельного выполнения кода на нескольких процессорных ядрах. Очень важно понимать, КОГДА её использовать, иначе можно получить замедление вместо ускорения.
Когда многопроцессорность эффективна
1. CPU-bound задачи (вычисления)
Это основная область применения. Когда программа активно использует процессор для математических расчётов, обработки данных, сжатия.
from multiprocessing import Pool
import time
def cpu_intensive_task(n):
"""Вычисления, требующие процессора"""
result = 0
for i in range(n):
result += sum([j**2 for j in range(1000)])
return result
# Последовательное выполнение
start = time.time()
results = [cpu_intensive_task(10000) for _ in range(4)]
print(f"Sequential: {time.time() - start:.2f}s")
# Параллельное выполнение на 4 ядрах
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, [10000] * 4)
print(f"Multiprocessing: {time.time() - start:.2f}s")
Результаты:
- Sequential: ~4.5 сек
- Multiprocessing: ~1.2 сек (почти 4x ускорение на 4 ядрах)
2. Обработка больших наборов данных
Когда нужно обработать огромный объём данных и можно разбить на батчи.
from multiprocessing import Pool
def process_chunk(data_chunk):
"""Обработка куска данных"""
return [
analyze_record(record)
for record in data_chunk
]
def process_large_dataset(data, num_processes=4):
chunk_size = len(data) // num_processes
chunks = [
data[i:i+chunk_size]
for i in range(0, len(data), chunk_size)
]
with Pool(processes=num_processes) as pool:
results = pool.map(process_chunk, chunks)
return [item for sublist in results for item in sublist]
3. Независимые долгие операции
Когда у вас есть несколько независимых длительных операций, которые можно запустить параллельно.
from multiprocessing import Process
import time
def compute_model_1():
"""Тренировка модели машинного обучения"""
time.sleep(30) # Долгая операция
return "Model 1 trained"
def compute_model_2():
"""Вторая модель"""
time.sleep(30)
return "Model 2 trained"
# Последовательно: 60 сек
# compute_model_1()
# compute_model_2()
# Параллельно: 30 сек
if __name__ == '__main__':
p1 = Process(target=compute_model_1)
p2 = Process(target=compute_model_2)
p1.start()
p2.start()
p1.join()
p2.join()
Когда многопроцессорность НЕ подходит
1. I/O-bound задачи (сетевые запросы, чтение файлов)
Здесь нужна асинхронность, а не многопроцессорность!
import asyncio
import aiohttp
# ПЛОХО - использовать multiprocessing для I/O
from multiprocessing import Pool
def fetch_url(url):
response = requests.get(url)
return response.text
# Медленно! Много overhead на создание процессов
with Pool(4) as pool:
results = pool.map(fetch_url, urls)
# ХОРОШО - использовать asyncio
async def fetch_url_async(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url_async(session, url) for url in urls]
return await asyncio.gather(*tasks)
# Быстро! Минимум overhead
results = asyncio.run(fetch_all(urls))
2. Задачи с частыми обменами данными
Межпроцессный обмен - дорого! Каждый обмен требует сериализации и десериализации.
# ПЛОХО - много обменов между процессами
from multiprocessing import Queue
queue = Queue()
def producer(q):
for i in range(1000000): # Миллион передач - очень медленно!
q.put(i)
def consumer(q):
while True:
item = q.get()
if item is None:
break
process(item)
# ХОРОШО - использовать потоки
from queue import Queue
import threading
queue = Queue()
# ... тот же код, но намного быстрее
Сравнение инструментов параллелизма
| Инструмент | Лучше всего для | Избегать при | Overhead |
|---|---|---|---|
| Threading | I/O-bound | CPU-bound (GIL блокирует) | Низкий |
| Multiprocessing | CPU-bound | Частый обмен данными | Высокий |
| Asyncio | I/O-bound с async кодом | Блокирующие операции | Очень низкий |
| Concurrent.futures | Простые параллельные задачи | Сложная координация | Средний |
GIL - главная преграда
Global Interpreter Lock (GIL) в CPython позволяет только одному потоку выполнять Python код одновременно. Это означает:
import threading
import time
def cpu_task():
result = sum([i**2 for i in range(10000000)])
return result
# Threading БЕЗ ускорения (GIL блокирует)
start = time.time()
t1 = threading.Thread(target=cpu_task)
t2 = threading.Thread(target=cpu_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.time() - start:.2f}s") # ~4.5 сек
# Multiprocessing С ускорением (нет GIL)
from multiprocessing import Process
start = time.time()
p1 = Process(target=cpu_task)
p2 = Process(target=cpu_task)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~2.3 сек
Оптимальное число процессов
import os
from multiprocessing import cpu_count
# Оптимальное число = количество ядер процессора
opt_processes = cpu_count()
print(f"Available cores: {opt_processes}")
# На практике часто используют:
opt_processes = cpu_count() or 1
# Для I/O можно больше
io_processes = cpu_count() * 2
Практический пример: обработка больших файлов
from multiprocessing import Pool, cpu_count
import csv
def process_rows(rows):
"""Обработка куска данных"""
results = []
for row in rows:
processed = {
'name': row['name'].upper(),
'score': float(row['score']) * 1.1
}
results.append(processed)
return results
def process_large_csv(filename, num_processes=None):
if num_processes is None:
num_processes = cpu_count()
# Читаем и разбиваем на батчи
with open(filename) as f:
reader = csv.DictReader(f)
rows = list(reader)
batch_size = len(rows) // num_processes
batches = [
rows[i:i+batch_size]
for i in range(0, len(rows), batch_size)
]
# Обрабатываем параллельно
with Pool(num_processes) as pool:
results = pool.map(process_rows, batches)
return [item for sublist in results for item in sublist]
if __name__ == '__main__':
processed = process_large_csv('huge_file.csv')
print(f"Processed {len(processed)} rows")
Итоговые рекомендации
- CPU-bound + большие объёмы → Multiprocessing ✓
- I/O-bound → Asyncio или threading ✓
- Частый обмен данными → Потоки (threading) ✓
- Нужна простота → concurrent.futures.ProcessPoolExecutor
- GIL вас душит? → multiprocessing или Cython/NumPy
Не забывайте: добавление параллелизма часто усложняет код. Сначала убедитесь, что это действительно нужно (профилируйте!), и только потом оптимизируйте.