← Назад к вопросам

Какие знаешь способы распараллеливания задач в Python?

2.0 Middle🔥 181 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Способы распараллеливания задач в Python

В Python есть три основных подхода к параллелизму, и выбор зависит от типа задачи. Разберу каждый с примерами.

1. Threading (Многопоточность)

Когда использовать: I/O-bound задачи (сетевые запросы, файловые операции).

import threading
import time
from concurrent.futures import ThreadPoolExecutor

# ❌ Наивный подход
threads = []
for url in urls:
    thread = threading.Thread(target=fetch_url, args=(url,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

# ✅ Лучше: использовать ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(fetch_url, urls))

# ✅ Или с futures для большей гибкости
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_url, url) for url in urls]
    for future in futures:
        try:
            result = future.result(timeout=5)
            print(result)
        except Exception as e:
            print(f"Error: {e}")

Проблемы threading:

  • GIL (Global Interpreter Lock) — только один поток выполняется за раз в CPython
  • Race conditions — нужны lock'и при доступе к общему состоянию
  • Сложная отладка — deadlock'и, race condition'ы
# ❌ Race condition
counter = 0

def increment():
    global counter
    for _ in range(1000000):
        counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Не будет 10000000, потому что race condition!

# ✅ Использовать Lock
import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(1000000):
        with lock:  # Защищаем доступ
            counter += 1

2. Multiprocessing (Многопроцессность)

Когда использовать: CPU-bound задачи (вычисления, обработка данных).

from multiprocessing import Pool, Process
import os

# ❌ Наивный подход
processes = []
for data in data_list:
    p = Process(target=process_data, args=(data,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

# ✅ Лучше: использовать Pool
with Pool(processes=os.cpu_count()) as pool:
    results = pool.map(process_data, data_list)

# ✅ Или с imap для потоковой обработки больших данных
with Pool(processes=4) as pool:
    for result in pool.imap(process_data, data_list, chunksize=100):
        print(result)  # Обработка по мере готовности

Проблемы multiprocessing:

  • Overhead — создание процесса дорого (нужно скопировать весь интерпретатор)
  • IPC (Inter-Process Communication) — медленнее, чем доступ к памяти
  • Сериализация — данные нужно pickled/unpickled
  • Нет GIL, но есть проблемы с состоянием
# Пример: обработка больших файлов
import multiprocessing

def process_chunk(chunk):
    return sum(x ** 2 for x in chunk)

if __name__ == '__main__':
    # Важно! multiprocessing требует if __name__ == '__main__'
    data = range(10000000)
    chunk_size = len(data) // 4
    chunks = [
        data[i:i+chunk_size]
        for i in range(0, len(data), chunk_size)
    ]
    
    with multiprocessing.Pool(4) as pool:
        results = pool.map(process_chunk, chunks)
        total = sum(results)

3. Asyncio (Асинхронное программирование)

Когда использовать: I/O-bound задачи с большим количеством одновременных операций (веб-скрепинг, микросервисы).

import asyncio
import aiohttp

# ❌ Синхронный подход (медленно)
import requests
for url in urls:
    response = requests.get(url)  # Ждем для каждого
    print(response.status_code)

# ✅ Асинхронный подход (быстро)
async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

results = asyncio.run(main())

# ✅ С ограничением concurrency
async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        return await fetch_url(session, url)

async def main():
    semaphore = asyncio.Semaphore(10)  # Максимум 10 одновременных
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore)
            for url in urls
        ]
        results = await asyncio.gather(*tasks)
        return results

Преимущества asyncio:

  • Один поток, но много concurrent операций
  • Нет GIL проблем (по сути)
  • Низкий overhead по сравнению с процессами

Проблемы:

  • Только I/O-bound — не помогает с CPU вычислениями
  • "Async plague" — нужно async/await везде
  • Сложно отлаживать — стек вызовов может быть непредсказуем

4. Комбинированные подходы

Часто нужно сочетать несколько подходов:

# Пример: веб-краулер с обработкой данных
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp

def cpu_intensive_parsing(html):
    # Тяжелая обработка HTML
    return parse_and_extract(html)

async def fetch_and_process(url, executor):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            html = await response.text()
    
    # Обработка в отдельном процессе (CPU-bound)
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor,
        cpu_intensive_parsing,
        html
    )
    return result

async def main():
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [
            fetch_and_process(url, executor)
            for url in urls
        ]
        results = await asyncio.gather(*tasks)
        return results

results = asyncio.run(main())

5. Практические примеры

I/O-bound: ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    response = requests.get(url)
    return response.status_code

urls = ["https://api.example.com/user/1"] * 100

with ThreadPoolExecutor(max_workers=20) as executor:
    results = list(executor.map(fetch_url, urls))

print(f"Completed: {len(results)}")

CPU-bound: ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

numbers = [30, 31, 32, 33, 34]

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(fibonacci, numbers))

print(results)

I/O-bound (много одновременных): asyncio

import asyncio
import aiohttp
from datetime import datetime

async def fetch_many_urls(urls):
    async with aiohttp.ClientSession() as session:
        async def fetch(url):
            try:
                async with session.get(url, timeout=5) as resp:
                    return await resp.text()
            except Exception as e:
                return f"Error: {e}"
        
        tasks = [fetch(url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]
start = datetime.now()
results = asyncio.run(fetch_many_urls(urls))
print(f"Time: {datetime.now() - start}")

Сравнительная таблица

ПодходТип задачOverheadGILСложность
ThreadingI/OНизкийДаСредняя
MultiprocessingCPUВысокийНетВысокая
AsyncioI/O (много)Очень низкийНетВысокая

Мой подход к выбору

  1. I/O-bound, мало операций → ThreadPoolExecutor
  2. I/O-bound, 100+ одновременных → asyncio + aiohttp
  3. CPU-bound → ProcessPoolExecutor
  4. CPU + I/O → asyncio + ProcessPoolExecutor
  5. Распределенные вычисления → Celery (Redis backend)
# Рекомендуемый шаблон
from concurrent.futures import ThreadPoolExecutor
import asyncio
from functools import wraps

class TaskExecutor:
    def __init__(self, thread_workers=10, process_workers=4):
        self.thread_executor = ThreadPoolExecutor(max_workers=thread_workers)
        # self.process_executor = ProcessPoolExecutor(max_workers=process_workers)
    
    def run_io_task(self, func, *args, **kwargs):
        """Для I/O операций"""
        loop = asyncio.get_event_loop()
        return loop.run_in_executor(self.thread_executor, func, *args)

Выбор метода — ключ к производительности. Профилируй реальные задачи, не гадай!

Какие знаешь способы распараллеливания задач в Python? | PrepBro