← Назад к вопросам
Какие знаешь способы распараллеливания задач в Python?
2.0 Middle🔥 181 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Способы распараллеливания задач в Python
В Python есть три основных подхода к параллелизму, и выбор зависит от типа задачи. Разберу каждый с примерами.
1. Threading (Многопоточность)
Когда использовать: I/O-bound задачи (сетевые запросы, файловые операции).
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# ❌ Наивный подход
threads = []
for url in urls:
thread = threading.Thread(target=fetch_url, args=(url,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
# ✅ Лучше: использовать ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, urls))
# ✅ Или с futures для большей гибкости
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
for future in futures:
try:
result = future.result(timeout=5)
print(result)
except Exception as e:
print(f"Error: {e}")
Проблемы threading:
- GIL (Global Interpreter Lock) — только один поток выполняется за раз в CPython
- Race conditions — нужны lock'и при доступе к общему состоянию
- Сложная отладка — deadlock'и, race condition'ы
# ❌ Race condition
counter = 0
def increment():
global counter
for _ in range(1000000):
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Не будет 10000000, потому что race condition!
# ✅ Использовать Lock
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(1000000):
with lock: # Защищаем доступ
counter += 1
2. Multiprocessing (Многопроцессность)
Когда использовать: CPU-bound задачи (вычисления, обработка данных).
from multiprocessing import Pool, Process
import os
# ❌ Наивный подход
processes = []
for data in data_list:
p = Process(target=process_data, args=(data,))
p.start()
processes.append(p)
for p in processes:
p.join()
# ✅ Лучше: использовать Pool
with Pool(processes=os.cpu_count()) as pool:
results = pool.map(process_data, data_list)
# ✅ Или с imap для потоковой обработки больших данных
with Pool(processes=4) as pool:
for result in pool.imap(process_data, data_list, chunksize=100):
print(result) # Обработка по мере готовности
Проблемы multiprocessing:
- Overhead — создание процесса дорого (нужно скопировать весь интерпретатор)
- IPC (Inter-Process Communication) — медленнее, чем доступ к памяти
- Сериализация — данные нужно pickled/unpickled
- Нет GIL, но есть проблемы с состоянием
# Пример: обработка больших файлов
import multiprocessing
def process_chunk(chunk):
return sum(x ** 2 for x in chunk)
if __name__ == '__main__':
# Важно! multiprocessing требует if __name__ == '__main__'
data = range(10000000)
chunk_size = len(data) // 4
chunks = [
data[i:i+chunk_size]
for i in range(0, len(data), chunk_size)
]
with multiprocessing.Pool(4) as pool:
results = pool.map(process_chunk, chunks)
total = sum(results)
3. Asyncio (Асинхронное программирование)
Когда использовать: I/O-bound задачи с большим количеством одновременных операций (веб-скрепинг, микросервисы).
import asyncio
import aiohttp
# ❌ Синхронный подход (медленно)
import requests
for url in urls:
response = requests.get(url) # Ждем для каждого
print(response.status_code)
# ✅ Асинхронный подход (быстро)
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(main())
# ✅ С ограничением concurrency
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
return await fetch_url(session, url)
async def main():
semaphore = asyncio.Semaphore(10) # Максимум 10 одновременных
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
return results
Преимущества asyncio:
- Один поток, но много concurrent операций
- Нет GIL проблем (по сути)
- Низкий overhead по сравнению с процессами
Проблемы:
- Только I/O-bound — не помогает с CPU вычислениями
- "Async plague" — нужно async/await везде
- Сложно отлаживать — стек вызовов может быть непредсказуем
4. Комбинированные подходы
Часто нужно сочетать несколько подходов:
# Пример: веб-краулер с обработкой данных
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
def cpu_intensive_parsing(html):
# Тяжелая обработка HTML
return parse_and_extract(html)
async def fetch_and_process(url, executor):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
html = await response.text()
# Обработка в отдельном процессе (CPU-bound)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
cpu_intensive_parsing,
html
)
return result
async def main():
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [
fetch_and_process(url, executor)
for url in urls
]
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(main())
5. Практические примеры
I/O-bound: ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return response.status_code
urls = ["https://api.example.com/user/1"] * 100
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(executor.map(fetch_url, urls))
print(f"Completed: {len(results)}")
CPU-bound: ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
numbers = [30, 31, 32, 33, 34]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(fibonacci, numbers))
print(results)
I/O-bound (много одновременных): asyncio
import asyncio
import aiohttp
from datetime import datetime
async def fetch_many_urls(urls):
async with aiohttp.ClientSession() as session:
async def fetch(url):
try:
async with session.get(url, timeout=5) as resp:
return await resp.text()
except Exception as e:
return f"Error: {e}"
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]
start = datetime.now()
results = asyncio.run(fetch_many_urls(urls))
print(f"Time: {datetime.now() - start}")
Сравнительная таблица
| Подход | Тип задач | Overhead | GIL | Сложность |
|---|---|---|---|---|
| Threading | I/O | Низкий | Да | Средняя |
| Multiprocessing | CPU | Высокий | Нет | Высокая |
| Asyncio | I/O (много) | Очень низкий | Нет | Высокая |
Мой подход к выбору
- I/O-bound, мало операций → ThreadPoolExecutor
- I/O-bound, 100+ одновременных → asyncio + aiohttp
- CPU-bound → ProcessPoolExecutor
- CPU + I/O → asyncio + ProcessPoolExecutor
- Распределенные вычисления → Celery (Redis backend)
# Рекомендуемый шаблон
from concurrent.futures import ThreadPoolExecutor
import asyncio
from functools import wraps
class TaskExecutor:
def __init__(self, thread_workers=10, process_workers=4):
self.thread_executor = ThreadPoolExecutor(max_workers=thread_workers)
# self.process_executor = ProcessPoolExecutor(max_workers=process_workers)
def run_io_task(self, func, *args, **kwargs):
"""Для I/O операций"""
loop = asyncio.get_event_loop()
return loop.run_in_executor(self.thread_executor, func, *args)
Выбор метода — ключ к производительности. Профилируй реальные задачи, не гадай!