← Назад к вопросам
Какие знаешь виды параллелизма в Python?
2.0 Middle🔥 171 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Виды параллелизма в Python
Python предлагает несколько способов реализации параллелизма, каждый с разными характеристиками и применением:
1. Multithreading (многопоточность)
Несколько потоков в одном процессе. Ограничено GIL (Global Interpreter Lock).
import threading
import time
from queue import Queue
# Простой пример многопоточности
def worker(name: str, queue: Queue):
while True:
item = queue.get()
if item is None:
break
print(f"{name} обрабатывает {item}")
time.sleep(0.5)
queue.task_done()
# Создание потоков
queue = Queue()
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"Worker-{i}", queue))
t.start()
threads.append(t)
# Добавляем работу
for item in range(10):
queue.put(item)
# Ждём завершения
queue.join()
# Останавливаем потоки
for _ in range(3):
queue.put(None)
for t in threads:
t.join()
print("Все работы завершены")
Лучше всего для:
- IO-bound операции (сетевые запросы, работа с БД)
- GUI приложения
- Простые фоновые задачи
Проблемы:
- GIL не позволяет запускать Python код параллельно
- Race conditions (конфликты между потоками)
- Deadlocks (взаимная блокировка)
2. Multiprocessing (многопроцессность)
Несколько отдельных процессов, каждый с собственным интерпретатором Python.
import multiprocessing
from multiprocessing import Pool, Process, Queue
import time
# CPU-bound функция
def cpu_intensive_task(n: int) -> int:
"""Вычисляет факториал"""
result = 1
for i in range(1, n + 1):
result *= i
return result
# 1. Использование Pool
def example_pool():
with Pool(processes=4) as pool:
numbers = [100, 200, 300, 400]
results = pool.map(cpu_intensive_task, numbers)
print(f"Pool результаты: {results}")
# 2. Использование Process
def worker(name: str, queue: Queue):
for i in range(5):
result = cpu_intensive_task(100 + i * 10)
queue.put({
"worker": name,
"result": result,
"iteration": i
})
def example_process():
queue = Queue()
processes = []
for i in range(3):
p = Process(target=worker, args=(f"Process-{i}", queue))
p.start()
processes.append(p)
# Собираем результаты
results = []
for _ in range(3 * 5):
results.append(queue.get())
for p in processes:
p.join()
print(f"Process результаты: {results[:3]}")
example_pool()
example_process()
Лучше всего для:
- CPU-bound операции (вычисления, обработка данных)
- Обход GIL
- Независимые параллельные задачи
Проблемы:
- Overhead на создание процесса (дорого по памяти)
- Сложнее обмен данными между процессами
- Не подходит для множества задач
3. Asyncio (асинхронное программирование)
Один поток, но множество корутин. Максимально эффективно для IO-bound.
import asyncio
import aiohttp
import time
# Асинхронная функция (корутина)
async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls: list) -> list:
"""Одновременно загружает несколько URL'ов"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Асинхронная обработка
async def async_processing():
urls = [
"https://example.com",
"https://google.com",
"https://github.com",
]
start = time.time()
results = await fetch_multiple_urls(urls)
elapsed = time.time() - start
print(f"Загруженные {len(results)} страниц за {elapsed:.2f} сек")
return results
# Запуск асинхронного кода
# asyncio.run(async_processing())
# Более сложный пример с queue
async def async_worker(name: str, queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"{name} обрабатывает {item}")
await asyncio.sleep(0.5) # Имитируем IO операцию
queue.task_done()
async def async_queue_example():
queue = asyncio.Queue()
# Создаём workers
workers = [
asyncio.create_task(async_worker(f"Worker-{i}", queue))
for i in range(3)
]
# Добавляем работу
for item in range(10):
await queue.put(item)
# Ждём обработки
await queue.join()
# Останавливаем workers
for _ in range(3):
await queue.put(None)
await asyncio.gather(*workers)
# asyncio.run(async_queue_example())
Лучше всего для:
- IO-bound операции (HTTP запросы, БД)
- Множество одновременных задач (500+)
- Real-time приложения
Преимущества:
- Минимальный overhead
- Максимальная производительность для IO-bound
- Легче отладка (один поток)
Проблемы:
- Нельзя использовать обычные блокирующие функции
- Нужна поддержка async в библиотеках
- Сложнее код
4. Сравнение
import time
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Функция для примера
def blocking_io(seconds: float = 1):
"""Имитирует IO операцию"""
time.sleep(seconds)
return f"Done after {seconds}s"
async def async_io(seconds: float = 1):
"""Асинхронная IO операция"""
await asyncio.sleep(seconds)
return f"Done after {seconds}s"
# Multithreading (11 секунд)
def test_threading():
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(blocking_io, 1) for _ in range(10)]
results = [f.result() for f in futures]
print(f"Threading: {time.time() - start:.2f}s")
# Asyncio (2 секунды)
async def test_asyncio():
start = time.time()
tasks = [async_io(1) for _ in range(10)]
results = await asyncio.gather(*tasks)
print(f"Asyncio: {time.time() - start:.2f}s")
# test_threading() # 11s (5 потоков, 10 задач)
# asyncio.run(test_asyncio()) # 2s
| Характеристика | Threading | Multiprocessing | Asyncio |
|---|---|---|---|
| GIL | Есть, ограничивает | Нет | Нет |
| Для IO-bound | Хорошо | Плохо | Отлично |
| Для CPU-bound | Плохо | Отлично | Плохо |
| Overhead | Низкий | Высокий | Минимальный |
| Простота | Средняя | Сложная | Сложная |
| Масштабируемость | До 100 потоков | До 50 процессов | 1000+ корутин |
| Обмен данными | Сложно (Race conditions) | Сложно (Serialization) | Легко (Queue) |
5. Практические примеры
# CPU-bound: используй multiprocessing
from multiprocessing import Pool
def calculate(n):
return sum(i * i for i in range(n))
numbers = [1000000, 2000000, 3000000, 4000000]
with Pool(4) as p:
results = p.map(calculate, numbers)
# IO-bound: используй asyncio
import asyncio
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
urls = ['http://example.com'] * 100
results = asyncio.run(asyncio.gather(*[fetch(url) for url in urls]))
# GUI/Long-lived connections: используй threading
import threading
def background_task():
# Работает в фоне
pass
thread = threading.Thread(target=background_task, daemon=True)
thread.start()
Выбор правильного инструмента
- CPU-bound: multiprocessing
- IO-bound (простой код): threading
- IO-bound (высокая нагрузка): asyncio
- GUI приложения: threading
- Веб-приложения: asyncio (FastAPI, aiohttp)
- Data processing: multiprocessing (pandas, numpy)