← Назад к вопросам

Для каких задач подходит многопроцессорность

2.0 Middle🔥 131 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Для каких задач подходит многопроцессорность в Python

Многопроцессорность (multiprocessing) - это инструмент параллельного выполнения кода на нескольких процессорных ядрах. Очень важно понимать, КОГДА её использовать, иначе можно получить замедление вместо ускорения.

Когда многопроцессорность эффективна

1. CPU-bound задачи (вычисления)

Это основная область применения. Когда программа активно использует процессор для математических расчётов, обработки данных, сжатия.

from multiprocessing import Pool
import time

def cpu_intensive_task(n):
    """Вычисления, требующие процессора"""
    result = 0
    for i in range(n):
        result += sum([j**2 for j in range(1000)])
    return result

# Последовательное выполнение
start = time.time()
results = [cpu_intensive_task(10000) for _ in range(4)]
print(f"Sequential: {time.time() - start:.2f}s")

# Параллельное выполнение на 4 ядрах
start = time.time()
with Pool(processes=4) as pool:
    results = pool.map(cpu_intensive_task, [10000] * 4)
print(f"Multiprocessing: {time.time() - start:.2f}s")

Результаты:

  • Sequential: ~4.5 сек
  • Multiprocessing: ~1.2 сек (почти 4x ускорение на 4 ядрах)

2. Обработка больших наборов данных

Когда нужно обработать огромный объём данных и можно разбить на батчи.

from multiprocessing import Pool

def process_chunk(data_chunk):
    """Обработка куска данных"""
    return [
        analyze_record(record)
        for record in data_chunk
    ]

def process_large_dataset(data, num_processes=4):
    chunk_size = len(data) // num_processes
    chunks = [
        data[i:i+chunk_size]
        for i in range(0, len(data), chunk_size)
    ]
    
    with Pool(processes=num_processes) as pool:
        results = pool.map(process_chunk, chunks)
    
    return [item for sublist in results for item in sublist]

3. Независимые долгие операции

Когда у вас есть несколько независимых длительных операций, которые можно запустить параллельно.

from multiprocessing import Process
import time

def compute_model_1():
    """Тренировка модели машинного обучения"""
    time.sleep(30)  # Долгая операция
    return "Model 1 trained"

def compute_model_2():
    """Вторая модель"""
    time.sleep(30)
    return "Model 2 trained"

# Последовательно: 60 сек
# compute_model_1()
# compute_model_2()

# Параллельно: 30 сек
if __name__ == '__main__':
    p1 = Process(target=compute_model_1)
    p2 = Process(target=compute_model_2)
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

Когда многопроцессорность НЕ подходит

1. I/O-bound задачи (сетевые запросы, чтение файлов)

Здесь нужна асинхронность, а не многопроцессорность!

import asyncio
import aiohttp

# ПЛОХО - использовать multiprocessing для I/O
from multiprocessing import Pool
def fetch_url(url):
    response = requests.get(url)
    return response.text

# Медленно! Много overhead на создание процессов
with Pool(4) as pool:
    results = pool.map(fetch_url, urls)

# ХОРОШО - использовать asyncio
async def fetch_url_async(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url_async(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# Быстро! Минимум overhead
results = asyncio.run(fetch_all(urls))

2. Задачи с частыми обменами данными

Межпроцессный обмен - дорого! Каждый обмен требует сериализации и десериализации.

# ПЛОХО - много обменов между процессами
from multiprocessing import Queue

queue = Queue()

def producer(q):
    for i in range(1000000):  # Миллион передач - очень медленно!
        q.put(i)

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        process(item)

# ХОРОШО - использовать потоки
from queue import Queue
import threading

queue = Queue()
# ... тот же код, но намного быстрее

Сравнение инструментов параллелизма

ИнструментЛучше всего дляИзбегать приOverhead
ThreadingI/O-boundCPU-bound (GIL блокирует)Низкий
MultiprocessingCPU-boundЧастый обмен даннымиВысокий
AsyncioI/O-bound с async кодомБлокирующие операцииОчень низкий
Concurrent.futuresПростые параллельные задачиСложная координацияСредний

GIL - главная преграда

Global Interpreter Lock (GIL) в CPython позволяет только одному потоку выполнять Python код одновременно. Это означает:

import threading
import time

def cpu_task():
    result = sum([i**2 for i in range(10000000)])
    return result

# Threading БЕЗ ускорения (GIL блокирует)
start = time.time()
t1 = threading.Thread(target=cpu_task)
t2 = threading.Thread(target=cpu_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.time() - start:.2f}s")  # ~4.5 сек

# Multiprocessing С ускорением (нет GIL)
from multiprocessing import Process
start = time.time()
p1 = Process(target=cpu_task)
p2 = Process(target=cpu_task)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")  # ~2.3 сек

Оптимальное число процессов

import os
from multiprocessing import cpu_count

# Оптимальное число = количество ядер процессора
opt_processes = cpu_count()
print(f"Available cores: {opt_processes}")

# На практике часто используют:
opt_processes = cpu_count() or 1

# Для I/O можно больше
io_processes = cpu_count() * 2

Практический пример: обработка больших файлов

from multiprocessing import Pool, cpu_count
import csv

def process_rows(rows):
    """Обработка куска данных"""
    results = []
    for row in rows:
        processed = {
            'name': row['name'].upper(),
            'score': float(row['score']) * 1.1
        }
        results.append(processed)
    return results

def process_large_csv(filename, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()
    
    # Читаем и разбиваем на батчи
    with open(filename) as f:
        reader = csv.DictReader(f)
        rows = list(reader)
    
    batch_size = len(rows) // num_processes
    batches = [
        rows[i:i+batch_size]
        for i in range(0, len(rows), batch_size)
    ]
    
    # Обрабатываем параллельно
    with Pool(num_processes) as pool:
        results = pool.map(process_rows, batches)
    
    return [item for sublist in results for item in sublist]

if __name__ == '__main__':
    processed = process_large_csv('huge_file.csv')
    print(f"Processed {len(processed)} rows")

Итоговые рекомендации

  • CPU-bound + большие объёмы → Multiprocessing ✓
  • I/O-bound → Asyncio или threading ✓
  • Частый обмен данными → Потоки (threading) ✓
  • Нужна простота → concurrent.futures.ProcessPoolExecutor
  • GIL вас душит? → multiprocessing или Cython/NumPy

Не забывайте: добавление параллелизма часто усложняет код. Сначала убедитесь, что это действительно нужно (профилируйте!), и только потом оптимизируйте.

Для каких задач подходит многопроцессорность | PrepBro