Что будет, если в event loop отправить CPU-bound задачу?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
CPU-bound задачи в event loop asyncio
Это одна из самых частых ошибок при работе с asyncio. Отправка CPU-bound задачи в event loop приведет к серьезным последствиям.
Что происходит
Когда вы отправляете CPU-bound задачу в event loop, происходит блокировка event loop. Это означает, что весь loop заморозится на время выполнения этой задачи, и другие coroutines не смогут выполняться.
Пример проблемы:
import asyncio
import time
async def cpu_bound_task():
# Вычислительно интенсивная задача
total = 0
for i in range(1_000_000_000):
total += i
return total
async def main():
start = time.time()
# Это заблокирует весь event loop!
result = await cpu_bound_task()
elapsed = time.time() - start
print(f"Выполнено за {elapsed:.2f} секунд")
asyncio.run(main())
В этом примере event loop будет заморожен на время выполнения вычисления.
Последствия
1. Полная деградация производительности
Все остальные async операции будут ждать завершения CPU-bound задачи:
import asyncio
import time
async def cpu_intensive():
for i in range(1_000_000_000):
_ = i ** 2
async def quick_io():
print(f"Начало: {time.time()}")
await asyncio.sleep(0.1) # I/O операция
print(f"Конец: {time.time()}")
async def main():
# quick_io должна завершиться через 0.1 сек,
# но будет ждать завершения cpu_intensive
await cpu_intensive()
await quick_io() # Выполнится намного позже!
asyncio.run(main())
2. Зависание приложения
В веб-приложениях это приводит к зависанию всех пользовательских запросов:
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/slow")
async def slow_endpoint():
# Ошибка: CPU-bound в async endpoint
for i in range(1_000_000_000):
_ = i ** 2
return {"status": "done"}
@app.get("/fast")
async def fast_endpoint():
# Эта функция не будет отвечать, пока slow_endpoint не завершится
return {"status": "ready"}
3. Timeout ошибки
Для высоконагруженных систем это приводит к timeout'ам:
async def main():
try:
# Таймаут не сработает на время выполнения async функции
result = await asyncio.wait_for(cpu_bound_task(), timeout=2.0)
except asyncio.TimeoutError:
print("Таймаут!")
Правильные решения
1. Использование loop.run_in_executor() для CPU-bound работы
Это отправляет задачу в отдельный thread pool и освобождает event loop:
import asyncio
def cpu_bound_function(n: int) -> int:
total = 0
for i in range(n):
total += i
return total
async def main():
loop = asyncio.get_event_loop()
# Выполнится в отдельном потоке
result = await loop.run_in_executor(None, cpu_bound_function, 1_000_000_000)
print(f"Результат: {result}")
asyncio.run(main())
2. Использование ProcessPoolExecutor для CPU-intensive работы
Для действительно требовательных к CPU задач используйте процессы вместо потоков:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def heavy_computation(n: int) -> int:
return sum(i ** 2 for i in range(n))
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
result = await loop.run_in_executor(executor, heavy_computation, 10_000_000)
print(f"Результат: {result}")
asyncio.run(main())
3. Разбиение на небольшие части с yield'ом
Если возможно, разбейте вычисления на части и используйте await между ними:
async def batched_computation():
total = 0
for batch in range(10):
for i in range(100_000_000):
total += i
# Даем event loop возможность обработать другие задачи
await asyncio.sleep(0)
return total
4. Использование асинхронных альтернатив
Рассмотрите использование асинхронных версий библиотек (например, numpy-async, concurrent-futures).
Лучшие практики
- I/O-bound задачи → await/async (HTTP запросы, БД, файловая система)
- CPU-bound задачи → loop.run_in_executor() с ThreadPoolExecutor или ProcessPoolExecutor
- Профилирование → используйте asyncio.get_event_loop().set_debug(True) для отладки
- Тестирование → всегда проверяйте производительность с реальной нагрузкой