← Назад к вопросам

Какие знаешь виды параллелизма в Python?

2.0 Middle🔥 171 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Виды параллелизма в Python

Python предлагает несколько способов реализации параллелизма, каждый с разными характеристиками и применением:

1. Multithreading (многопоточность)

Несколько потоков в одном процессе. Ограничено GIL (Global Interpreter Lock).

import threading
import time
from queue import Queue

# Простой пример многопоточности
def worker(name: str, queue: Queue):
    while True:
        item = queue.get()
        if item is None:
            break
        
        print(f"{name} обрабатывает {item}")
        time.sleep(0.5)
        queue.task_done()

# Создание потоков
queue = Queue()
threads = []

for i in range(3):
    t = threading.Thread(target=worker, args=(f"Worker-{i}", queue))
    t.start()
    threads.append(t)

# Добавляем работу
for item in range(10):
    queue.put(item)

# Ждём завершения
queue.join()

# Останавливаем потоки
for _ in range(3):
    queue.put(None)
for t in threads:
    t.join()

print("Все работы завершены")

Лучше всего для:

  • IO-bound операции (сетевые запросы, работа с БД)
  • GUI приложения
  • Простые фоновые задачи

Проблемы:

  • GIL не позволяет запускать Python код параллельно
  • Race conditions (конфликты между потоками)
  • Deadlocks (взаимная блокировка)

2. Multiprocessing (многопроцессность)

Несколько отдельных процессов, каждый с собственным интерпретатором Python.

import multiprocessing
from multiprocessing import Pool, Process, Queue
import time

# CPU-bound функция
def cpu_intensive_task(n: int) -> int:
    """Вычисляет факториал"""
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

# 1. Использование Pool
def example_pool():
    with Pool(processes=4) as pool:
        numbers = [100, 200, 300, 400]
        results = pool.map(cpu_intensive_task, numbers)
    
    print(f"Pool результаты: {results}")

# 2. Использование Process
def worker(name: str, queue: Queue):
    for i in range(5):
        result = cpu_intensive_task(100 + i * 10)
        queue.put({
            "worker": name,
            "result": result,
            "iteration": i
        })

def example_process():
    queue = Queue()
    processes = []
    
    for i in range(3):
        p = Process(target=worker, args=(f"Process-{i}", queue))
        p.start()
        processes.append(p)
    
    # Собираем результаты
    results = []
    for _ in range(3 * 5):
        results.append(queue.get())
    
    for p in processes:
        p.join()
    
    print(f"Process результаты: {results[:3]}")

example_pool()
example_process()

Лучше всего для:

  • CPU-bound операции (вычисления, обработка данных)
  • Обход GIL
  • Независимые параллельные задачи

Проблемы:

  • Overhead на создание процесса (дорого по памяти)
  • Сложнее обмен данными между процессами
  • Не подходит для множества задач

3. Asyncio (асинхронное программирование)

Один поток, но множество корутин. Максимально эффективно для IO-bound.

import asyncio
import aiohttp
import time

# Асинхронная функция (корутина)
async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()

async def fetch_multiple_urls(urls: list) -> list:
    """Одновременно загружает несколько URL'ов"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# Асинхронная обработка
async def async_processing():
    urls = [
        "https://example.com",
        "https://google.com",
        "https://github.com",
    ]
    
    start = time.time()
    results = await fetch_multiple_urls(urls)
    elapsed = time.time() - start
    
    print(f"Загруженные {len(results)} страниц за {elapsed:.2f} сек")
    return results

# Запуск асинхронного кода
# asyncio.run(async_processing())

# Более сложный пример с queue
async def async_worker(name: str, queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        
        print(f"{name} обрабатывает {item}")
        await asyncio.sleep(0.5)  # Имитируем IO операцию
        queue.task_done()

async def async_queue_example():
    queue = asyncio.Queue()
    
    # Создаём workers
    workers = [
        asyncio.create_task(async_worker(f"Worker-{i}", queue))
        for i in range(3)
    ]
    
    # Добавляем работу
    for item in range(10):
        await queue.put(item)
    
    # Ждём обработки
    await queue.join()
    
    # Останавливаем workers
    for _ in range(3):
        await queue.put(None)
    await asyncio.gather(*workers)

# asyncio.run(async_queue_example())

Лучше всего для:

  • IO-bound операции (HTTP запросы, БД)
  • Множество одновременных задач (500+)
  • Real-time приложения

Преимущества:

  • Минимальный overhead
  • Максимальная производительность для IO-bound
  • Легче отладка (один поток)

Проблемы:

  • Нельзя использовать обычные блокирующие функции
  • Нужна поддержка async в библиотеках
  • Сложнее код

4. Сравнение

import time
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor

# Функция для примера
def blocking_io(seconds: float = 1):
    """Имитирует IO операцию"""
    time.sleep(seconds)
    return f"Done after {seconds}s"

async def async_io(seconds: float = 1):
    """Асинхронная IO операция"""
    await asyncio.sleep(seconds)
    return f"Done after {seconds}s"

# Multithreading (11 секунд)
def test_threading():
    start = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(blocking_io, 1) for _ in range(10)]
        results = [f.result() for f in futures]
    print(f"Threading: {time.time() - start:.2f}s")

# Asyncio (2 секунды)
async def test_asyncio():
    start = time.time()
    tasks = [async_io(1) for _ in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"Asyncio: {time.time() - start:.2f}s")

# test_threading()  # 11s (5 потоков, 10 задач)
# asyncio.run(test_asyncio())  # 2s
ХарактеристикаThreadingMultiprocessingAsyncio
GILЕсть, ограничиваетНетНет
Для IO-boundХорошоПлохоОтлично
Для CPU-boundПлохоОтличноПлохо
OverheadНизкийВысокийМинимальный
ПростотаСредняяСложнаяСложная
МасштабируемостьДо 100 потоковДо 50 процессов1000+ корутин
Обмен даннымиСложно (Race conditions)Сложно (Serialization)Легко (Queue)

5. Практические примеры

# CPU-bound: используй multiprocessing
from multiprocessing import Pool

def calculate(n):
    return sum(i * i for i in range(n))

numbers = [1000000, 2000000, 3000000, 4000000]
with Pool(4) as p:
    results = p.map(calculate, numbers)

# IO-bound: используй asyncio
import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

urls = ['http://example.com'] * 100
results = asyncio.run(asyncio.gather(*[fetch(url) for url in urls]))

# GUI/Long-lived connections: используй threading
import threading

def background_task():
    # Работает в фоне
    pass

thread = threading.Thread(target=background_task, daemon=True)
thread.start()

Выбор правильного инструмента

  1. CPU-bound: multiprocessing
  2. IO-bound (простой код): threading
  3. IO-bound (высокая нагрузка): asyncio
  4. GUI приложения: threading
  5. Веб-приложения: asyncio (FastAPI, aiohttp)
  6. Data processing: multiprocessing (pandas, numpy)