← Назад к вопросам
Будешь ли делать асинхронной CPU-bound задачу
2.0 Middle🔥 141 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Будешь ли делать асинхронной CPU-bound задачу?
Отличный вопрос о понимании асинхронного программирования! Это проверяет знание когда async/await уместен.
Короткий ответ
НЕТ, не буду. CPU-bound задачи асинхронного программирования не помогают. Это наоборот, замедлит выполнение из-за overhead на контекстные переключения.
Почему не буду
1. Python имеет GIL (Global Interpreter Lock)
В CPython одновременно может выполняться только одно потокобезопасное выражение:
# ❌ Async НЕ ПОМОЖЕТ для CPU-bound!
async def compute_heavy():
# Вычисляем сложную функцию (1 миллиард операций)
result = sum(i**2 for i in range(1_000_000_000))
return result
async def main():
# Даже если запустим параллельно, GIL позволит выполняться только одной задаче
tasks = [compute_heavy() for _ in range(4)]
results = await asyncio.gather(*tasks)
# Это медленнее, чем просто sequential!
# Async добавляет overhead без выигрыша
2. Async предназначен для I/O операций
Async работает хорошо для операций, которые занимают время вне Python:
# ✅ Async ПОМОЖЕТ для I/O-bound!
async def fetch_from_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
# Пока ждём ответ от сервера, event loop выполняет другие корутины
return await resp.json()
async def main():
# 4 запроса параллельно, общее время ~1 сек (вместо 4 сек)
tasks = [fetch_from_api() for _ in range(4)]
results = await asyncio.gather(*tasks)
Сравнение: CPU-bound vs I/O-bound
import asyncio
import time
from aiohttp import ClientSession
# CPU-BOUND: Вычисления
def cpu_task(n):
"""Вычисляем числа Фибоначчи (медленно)"""
def fib(x):
if x < 2:
return x
return fib(x-1) + fib(x-2)
return fib(n)
# I/O-BOUND: Сетевые запросы
async def io_task():
"""Запрашиваем данные с сервера"""
async with ClientSession() as session:
async with session.get('https://httpbin.org/delay/1') as resp:
return await resp.text()
# ТЕСТ 1: CPU-bound sequential (хорошо)
start = time.time()
for _ in range(4):
cpu_task(35) # ~5 сек
print(f"CPU sequential: {time.time() - start:.1f}s")
# ТЕСТ 2: CPU-bound async (ПЛОХО!)
start = time.time()
async def cpu_async():
tasks = [asyncio.to_thread(cpu_task, 35) for _ in range(4)]
await asyncio.gather(*tasks) # ~5 сек + overhead
asyncio.run(cpu_async())
print(f"CPU async: {time.time() - start:.1f}s")
# ТЕСТ 3: I/O-bound sequential (ПЛОХО)
start = time.time()
for _ in range(4):
requests.get('https://httpbin.org/delay/1') # ~4 сек
print(f"I/O sequential: {time.time() - start:.1f}s")
# ТЕСТ 4: I/O-bound async (ОТЛИЧНО!)
start = time.time()
async def io_async():
tasks = [io_task() for _ in range(4)]
await asyncio.gather(*tasks) # ~1 сек
asyncio.run(io_async())
print(f"I/O async: {time.time() - start:.1f}s")
Результаты:
CPU sequential: 5.2s ✅ хорошо
CPU async: 5.8s ❌ хуже (из-за overhead)
I/O sequential: 4.1s ❌ плохо
I/O async: 1.0s ✅ отлично!
Решения для CPU-bound
1. multiprocessing (несколько процессов, обходит GIL)
from multiprocessing import Pool
import time
def cpu_task(n):
"""Вычисление (медленно)"""
return sum(i**2 for i in range(n))
if __name__ == '__main__':
# Каждый процесс имеет собственный GIL
with Pool(processes=4) as pool:
start = time.time()
results = pool.map(cpu_task, [10_000_000] * 4)
print(f"multiprocessing: {time.time() - start:.1f}s") # ~1.5s (4 ядра)
2. asyncio.to_thread() для асинхронного обёртывания
import asyncio
def heavy_computation(n):
return sum(i**2 for i in range(n))
async def main():
# Запускаем CPU-bound в отдельном потоке
# НЕ рекомендуется! Это просто добавляет overhead
result = await asyncio.to_thread(heavy_computation, 10_000_000)
print(result)
asyncio.run(main())
3. ThreadPoolExecutor для параллельных вычислений
from concurrent.futures import ThreadPoolExecutor
import time
def cpu_task(n):
return sum(i**2 for i in range(n))
with ThreadPoolExecutor(max_workers=4) as executor:
start = time.time()
futures = [executor.submit(cpu_task, 10_000_000) for _ in range(4)]
results = [f.result() for f in futures]
print(f"ThreadPoolExecutor: {time.time() - start:.1f}s") # ~5s (из-за GIL)
4. Используй библиотеку на C (Cython, NumPy)
import numpy as np
# NumPy работает параллельно (обходит GIL)
arr = np.arange(10_000_000)
result = np.sum(arr**2) # Выполняется параллельно на C уровне
Когда использовать async
✅ ИСПОЛЬЗУЙ async:
- HTTP запросы (aiohttp, httpx)
- Database запросы (asyncpg, asyncmy)
- WebSocket соединения
- Одновременное ожидание нескольких I/O операций
❌ НЕ ИСПОЛЬЗУЙ async:
- Тяжёлые вычисления (sum, sort, algorithm)
- Обработка больших массивов данных
- Machine Learning обучение
- Криптография
Практический пример: Правильный выбор
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
# Задача 1: Вычисление (CPU-bound) → multiprocessing
def compute(data):
return sum(x**2 for x in data)
# Задача 2: Запрос API (I/O-bound) → async
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
# Задача 3: Комбинированная
async def process_urls_with_computation(urls):
# Получаем данные асинхронно
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks)
# Вычисляем в отдельных процессах
with ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_event_loop()
computed = await loop.run_in_executor(
executor,
compute,
[item for sublist in results for item in sublist]
)
return computed
Вывод
Для CPU-bound задач async бесполезен (или вреден). Лучшие альтернативы:
- multiprocessing — для настоящего параллелизма
- asyncio.to_thread() — только если нет выбора
- Оптимизированные библиотеки (NumPy, Cython)
- Async только для I/O — это то, для чего он был создан
Правило простое: async для I/O, multiprocessing для CPU.