← Назад к вопросам
Переключится ли Event Loop, если одна из корутин занята CPU bound задачей
3.0 Senior🔥 71 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Ответ: Нет, Event Loop не переключится
Если корутина занята CPU-bound задачей, Event Loop не сможет переключиться на другие корутины, пока текущая не закончит работу. Это ключевая особенность асинхронного программирования в Python.
Почему происходит блокировка
Event Loop в asyncio работает в одном потоке и работает в режиме кооперативной многозадачности:
- Кооперативность — корутины сами отдают управление Event Loop-у через
await - CPU-bound задача блокирует — если код выполняет интенсивные вычисления без
await, Event Loop не может переключиться - Нет прерываний — asyncio не прерывает выполняющийся код, в отличие от потоков
import asyncio
import time
async def cpu_bound_task():
"""Эта корутина занята CPU-bound работой"""
# Без await — Event Loop не может переключиться
for i in range(100_000_000):
_ = i ** 2
print("CPU task finished")
async def io_bound_task():
"""Эта корутина никогда не запустится во время cpu_bound_task"""
print("IO task started")
await asyncio.sleep(0.1)
print("IO task finished")
async def main():
# cpu_bound_task заблокирует весь Event Loop
await asyncio.gather(
cpu_bound_task(),
io_bound_task()
)
# io_bound_task дождётся завершения cpu_bound_task
# Вывод:
# CPU task finished
# IO task started
# IO task finished
Решение: Как избежать блокировки
1. Использовать loop.run_in_executor() — для CPU-bound в отдельном потоке
import asyncio
from concurrent.futures import ThreadPoolExecutor
def cpu_intensive():
"""Обычная функция (не корутина)"""
for i in range(100_000_000):
_ = i ** 2
return "Done"
async def main():
loop = asyncio.get_event_loop()
# Запускаем CPU-bound в отдельном потоке
result = await loop.run_in_executor(None, cpu_intensive)
print(result)
asyncio.run(main())
2. Разбить CPU-bound на части с await asyncio.sleep(0) — пустой sleep для переключения
async def cpu_bound_with_yields():
"""CPU-bound задача с переключениями Event Loop-а"""
for chunk in range(10):
# Вычисления
for i in range(10_000_000):
_ = i ** 2
# Даём Event Loop-у шанс переключиться
await asyncio.sleep(0)
print(f"Chunk {chunk} done")
async def io_task():
print("IO started")
await asyncio.sleep(0.1)
print("IO finished")
async def main():
# Теперь io_task сможет выполняться параллельно
await asyncio.gather(
cpu_bound_with_yields(),
io_task()
)
3. Использовать asyncio.to_thread() (Python 3.9+)
import asyncio
def cpu_intensive():
for i in range(100_000_000):
_ = i ** 2
return "Done"
async def main():
# Аналог run_in_executor, но более удобный
result = await asyncio.to_thread(cpu_intensive)
print(result)
asyncio.run(main())
Почему это критично
Проблема: Если в async приложении (например, FastAPI, Telegram бот) хоть одна корутина выполнит CPU-bound задачу без прерываний, всё приложение зависнет для других пользователей.
Пример: если в Telegram боте на одного пользователя потратить 5 секунд на вычисления, остальные 10000 пользователей ждут эти 5 секунд.
Итог
- Event Loop не переключится, пока CPU-bound корутина не вернёт управление
- Это ломает asyncio — используй
run_in_executor()илиto_thread() - Правило: async — для I/O, threading/multiprocessing — для CPU
- Помни:
await= точка переключения, безawait= блокировка