← Назад к вопросам

Какие знаешь механизмы параллельности в Python?

2.0 Middle🔥 231 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Механизмы параллельности в Python

Python имеет несколько способов достичь параллельного выполнения, каждый с собственными преимуществами и недостатками. Главный враг Python-разработчика — это GIL (Global Interpreter Lock).

GIL (Global Interpreter Lock) — главный враг

GIL — это мьютекс в CPython, который позволяет только одному потоку выполнять Python байт-код одновременно.

import time
import threading

def cpu_bound_task():
    """Задача, зависящая от CPU"""
    total = 0
    for i in range(100_000_000):
        total += i
    return total

# Однопоточный
start = time.time()
cpu_bound_task()
print(f"Single-threaded: {time.time() - start:.2f}s")  # ~3s

# Двухпоточный — медленнее!
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Two threads: {time.time() - start:.2f}s")  # ~6s (медленнее из-за GIL!)

GIL отпускается только во время I/O операций (сеть, файлы).

1. Threading — легкие потоки (но GIL!)

Используй для I/O-bound задач (сеть, файлы).

import threading
import time
from urllib.request import urlopen

def fetch_url(url):
    """Задача, зависящая от I/O"""
    try:
        response = urlopen(url, timeout=5)
        return len(response.read())
    except Exception as e:
        return 0

urls = [
    'https://example.com',
    'https://google.com',
    'https://github.com',
] * 5

# Однопоточно
start = time.time()
for url in urls:
    fetch_url(url)
print(f"Single-threaded: {time.time() - start:.2f}s")  # ~30s (блокирует)

# Многопоточно
start = time.time()
threads = []
for url in urls:
    t = threading.Thread(target=fetch_url, args=(url,))
    t.start()
    threads.append(t)
for t in threads:
    t.join()
print(f"Multi-threaded: {time.time() - start:.2f}s")  # ~6s (параллельно)

Проблемы:

  • GIL блокирует параллельное выполнение CPU-bound кода
  • Race conditions если не синхронизировать
  • Сложный debugging

Решение race conditions:

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:  # Критическая секция
            self.value += 1

counter = Counter()
threads = []

for _ in range(100):
    t = threading.Thread(target=counter.increment)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print(counter.value)  # 100 (без lock было бы случайное число < 100)

2. Multiprocessing — отдельные процессы (обходит GIL)

Для CPU-bound задач. Каждый процесс имеет собственный GIL.

import multiprocessing
import time

def cpu_bound_task(n):
    """Вычисления"""
    total = 0
    for i in range(n):
        total += i
    return total

if __name__ == '__main__':
    # Pool — управляет процессами
    with multiprocessing.Pool(processes=4) as pool:
        start = time.time()
        results = pool.map(cpu_bound_task, [100_000_000] * 4)
        print(f"Multiprocessing: {time.time() - start:.2f}s")  # ~3s (быстро!)

# Или вручную
if __name__ == '__main__':
    processes = []
    start = time.time()
    
    for _ in range(4):
        p = multiprocessing.Process(target=cpu_bound_task, args=(100_000_000,))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Time: {time.time() - start:.2f}s")

Проблемы:

  • Большие затраты на создание процесса
  • Сложность обмена данными между процессами
  • Требует pickle сериализации

Передача данных между процессами:

from multiprocessing import Queue, Process

def worker(q):
    q.put("Hello from worker")

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    msg = q.get()  # Блокируется, пока не получит сообщение
    print(msg)
    p.join()

3. Asyncio — асинхронность (один поток, кооперативная мультизадачность)

Для тысяч одновременных I/O операций. Не параллель, а concurrency.

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
        return await response.text()

async def main():
    urls = ['https://example.com'] * 20
    
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    print(f"Asyncio: {time.time() - start:.2f}s")  # ~5s (20 параллельно)

asyncio.run(main())

Как это работает:

async def task1():
    print("Task 1: Start")
    await asyncio.sleep(2)  # Отпускаю контроль
    print("Task 1: End")

async def task2():
    print("Task 2: Start")
    await asyncio.sleep(1)
    print("Task 2: End")

async def main():
    # Выполняются параллельно!
    await asyncio.gather(task1(), task2())
    # Всего займет ~2s, не 3s

asyncio.run(main())
# Task 1: Start
# Task 2: Start
# Task 2: End
# Task 1: End

Преимущества asyncio:

  • Масштабируется на 10k+ одновременных соединений
  • Один поток — нет race conditions
  • Меньше памяти чем потоки

Недостатки:

  • Сложнее писать (нужен async/await синтаксис везде)
  • Не помогает для CPU-bound задач
  • Сложнее дебажить

4. Concurrent.futures — высокоуровневый API

Простой способ использовать потоки или процессы.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def io_task(n):
    time.sleep(1)  # Имитация I/O
    return n * 2

# ThreadPoolExecutor для I/O
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(io_task, range(10)))
print(f"Threads: {time.time() - start:.2f}s")  # ~2.5s (10 параллельно)

# ProcessPoolExecutor для CPU
def cpu_task(n):
    return sum(range(n))

start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_task, [50_000_000] * 4))
print(f"Processes: {time.time() - start:.2f}s")  # ~3s

5. Celery — распределённые задачи

Для асинхронных задач в production.

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def long_running_task(x):
    """Выполнится в отдельном процессе/машине"""
    time.sleep(10)
    return x * 2

# Вызов (не блокирует)
result = long_running_task.delay(42)
print(result.id)  # ID задачи

# Получить результат позже
result.get(timeout=30)  # Блокирует, ждёт результата

# Проверить статус
if result.ready():
    print(result.get())

Сравнительная таблица

Механизм         | Тип       | GIL | Масштаб | Use case
-----------------|-----------|-----|---------|------------------
Threading        | Потоки    | Да  | 100s    | I/O (сеть, файлы)
Multiprocessing  | Процессы  | Нет | 10s     | CPU-bound вычисления
Asyncio          | Asyncio   | Да  | 10k+    | Много I/O операций
Concurrent       | Оба       | Да  | 100s    | Простой I/O
Celery           | Процессы  | Нет | 1000s   | Production задачи

Практический пример: веб-скрепер

import asyncio
import aiohttp
from time import time

async def scrape_page(session, url):
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
            return await resp.text()
    except Exception as e:
        print(f"Error {url}: {e}")
        return None

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(100)]
    
    start = time()
    async with aiohttp.ClientSession() as session:
        tasks = [scrape_page(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    elapsed = time() - start
    print(f"Scraped {len([r for r in results if r])} pages in {elapsed:.2f}s")
    print(f"Speed: {len(urls) / elapsed:.0f} pages/sec")

asyncio.run(main())

Выбор механизма

Используй Threading если:

  • I/O-bound задачи
  • Нужна простота
  • До 100 одновременных операций

Используй Multiprocessing если:

  • CPU-bound вычисления
  • Нужна реальная параллель
  • До 10 параллельных процессов

Используй Asyncio если:

  • Много I/O операций (1000+)
  • Нужна масштабируемость
  • Готов к async/await синтаксису

Используй Celery если:

  • Production приложение
  • Нужно распределение задач
  • Нужна reliability и retry механизм