Какие знаешь механизмы параллельности в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Механизмы параллельности в Python
Python имеет несколько способов достичь параллельного выполнения, каждый с собственными преимуществами и недостатками. Главный враг Python-разработчика — это GIL (Global Interpreter Lock).
GIL (Global Interpreter Lock) — главный враг
GIL — это мьютекс в CPython, который позволяет только одному потоку выполнять Python байт-код одновременно.
import time
import threading
def cpu_bound_task():
"""Задача, зависящая от CPU"""
total = 0
for i in range(100_000_000):
total += i
return total
# Однопоточный
start = time.time()
cpu_bound_task()
print(f"Single-threaded: {time.time() - start:.2f}s") # ~3s
# Двухпоточный — медленнее!
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Two threads: {time.time() - start:.2f}s") # ~6s (медленнее из-за GIL!)
GIL отпускается только во время I/O операций (сеть, файлы).
1. Threading — легкие потоки (но GIL!)
Используй для I/O-bound задач (сеть, файлы).
import threading
import time
from urllib.request import urlopen
def fetch_url(url):
"""Задача, зависящая от I/O"""
try:
response = urlopen(url, timeout=5)
return len(response.read())
except Exception as e:
return 0
urls = [
'https://example.com',
'https://google.com',
'https://github.com',
] * 5
# Однопоточно
start = time.time()
for url in urls:
fetch_url(url)
print(f"Single-threaded: {time.time() - start:.2f}s") # ~30s (блокирует)
# Многопоточно
start = time.time()
threads = []
for url in urls:
t = threading.Thread(target=fetch_url, args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"Multi-threaded: {time.time() - start:.2f}s") # ~6s (параллельно)
Проблемы:
- GIL блокирует параллельное выполнение CPU-bound кода
- Race conditions если не синхронизировать
- Сложный debugging
Решение race conditions:
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock: # Критическая секция
self.value += 1
counter = Counter()
threads = []
for _ in range(100):
t = threading.Thread(target=counter.increment)
t.start()
threads.append(t)
for t in threads:
t.join()
print(counter.value) # 100 (без lock было бы случайное число < 100)
2. Multiprocessing — отдельные процессы (обходит GIL)
Для CPU-bound задач. Каждый процесс имеет собственный GIL.
import multiprocessing
import time
def cpu_bound_task(n):
"""Вычисления"""
total = 0
for i in range(n):
total += i
return total
if __name__ == '__main__':
# Pool — управляет процессами
with multiprocessing.Pool(processes=4) as pool:
start = time.time()
results = pool.map(cpu_bound_task, [100_000_000] * 4)
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~3s (быстро!)
# Или вручную
if __name__ == '__main__':
processes = []
start = time.time()
for _ in range(4):
p = multiprocessing.Process(target=cpu_bound_task, args=(100_000_000,))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Time: {time.time() - start:.2f}s")
Проблемы:
- Большие затраты на создание процесса
- Сложность обмена данными между процессами
- Требует pickle сериализации
Передача данных между процессами:
from multiprocessing import Queue, Process
def worker(q):
q.put("Hello from worker")
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
msg = q.get() # Блокируется, пока не получит сообщение
print(msg)
p.join()
3. Asyncio — асинхронность (один поток, кооперативная мультизадачность)
Для тысяч одновременных I/O операций. Не параллель, а concurrency.
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
return await response.text()
async def main():
urls = ['https://example.com'] * 20
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Asyncio: {time.time() - start:.2f}s") # ~5s (20 параллельно)
asyncio.run(main())
Как это работает:
async def task1():
print("Task 1: Start")
await asyncio.sleep(2) # Отпускаю контроль
print("Task 1: End")
async def task2():
print("Task 2: Start")
await asyncio.sleep(1)
print("Task 2: End")
async def main():
# Выполняются параллельно!
await asyncio.gather(task1(), task2())
# Всего займет ~2s, не 3s
asyncio.run(main())
# Task 1: Start
# Task 2: Start
# Task 2: End
# Task 1: End
Преимущества asyncio:
- Масштабируется на 10k+ одновременных соединений
- Один поток — нет race conditions
- Меньше памяти чем потоки
Недостатки:
- Сложнее писать (нужен async/await синтаксис везде)
- Не помогает для CPU-bound задач
- Сложнее дебажить
4. Concurrent.futures — высокоуровневый API
Простой способ использовать потоки или процессы.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def io_task(n):
time.sleep(1) # Имитация I/O
return n * 2
# ThreadPoolExecutor для I/O
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(io_task, range(10)))
print(f"Threads: {time.time() - start:.2f}s") # ~2.5s (10 параллельно)
# ProcessPoolExecutor для CPU
def cpu_task(n):
return sum(range(n))
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, [50_000_000] * 4))
print(f"Processes: {time.time() - start:.2f}s") # ~3s
5. Celery — распределённые задачи
Для асинхронных задач в production.
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def long_running_task(x):
"""Выполнится в отдельном процессе/машине"""
time.sleep(10)
return x * 2
# Вызов (не блокирует)
result = long_running_task.delay(42)
print(result.id) # ID задачи
# Получить результат позже
result.get(timeout=30) # Блокирует, ждёт результата
# Проверить статус
if result.ready():
print(result.get())
Сравнительная таблица
Механизм | Тип | GIL | Масштаб | Use case
-----------------|-----------|-----|---------|------------------
Threading | Потоки | Да | 100s | I/O (сеть, файлы)
Multiprocessing | Процессы | Нет | 10s | CPU-bound вычисления
Asyncio | Asyncio | Да | 10k+ | Много I/O операций
Concurrent | Оба | Да | 100s | Простой I/O
Celery | Процессы | Нет | 1000s | Production задачи
Практический пример: веб-скрепер
import asyncio
import aiohttp
from time import time
async def scrape_page(session, url):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return await resp.text()
except Exception as e:
print(f"Error {url}: {e}")
return None
async def main():
urls = [f"https://example.com/page/{i}" for i in range(100)]
start = time()
async with aiohttp.ClientSession() as session:
tasks = [scrape_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time() - start
print(f"Scraped {len([r for r in results if r])} pages in {elapsed:.2f}s")
print(f"Speed: {len(urls) / elapsed:.0f} pages/sec")
asyncio.run(main())
Выбор механизма
Используй Threading если:
- I/O-bound задачи
- Нужна простота
- До 100 одновременных операций
Используй Multiprocessing если:
- CPU-bound вычисления
- Нужна реальная параллель
- До 10 параллельных процессов
Используй Asyncio если:
- Много I/O операций (1000+)
- Нужна масштабируемость
- Готов к async/await синтаксису
Используй Celery если:
- Production приложение
- Нужно распределение задач
- Нужна reliability и retry механизм