← Назад к вопросам
Как избежать блокировки асинхронности в случае CPU-bound задач?
3.0 Senior🔥 201 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Избежание блокировки асинхронности в CPU-bound задачах
Проблема: CPU-bound операции (вычисления, парсинг, обработка данных) блокируют event loop в асинхронном коде, даже если они выглядят неблокирующими.
Проблема: blocking CPU-bound code
import asyncio
async def process_large_file(filepath: str) -> int:
# Это БЛОКИРУЕТ event loop!
data = open(filepath).read() # I/O wait — OK
# Это БЛОКИРУЕТ event loop на долгое время!
result = sum(int(line) for line in data.split("\\n")) # CPU work
return result
async def main():
# Первый запрос будет заблокирован вторым на время CPU работы
task1 = asyncio.create_task(process_large_file("file1.txt")) # 5 сек CPU
task2 = asyncio.create_task(process_large_file("file2.txt")) # 5 сек CPU
# Вместо параллельного выполнения 5 сек будет 10 сек!
results = await asyncio.gather(task1, task2)
print(results)
Решение 1: run_in_executor
Запустить CPU-bound код в отдельном потоке:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def cpu_intensive_task(n: int) -> int:
"""Синхронная CPU-bound функция"""
return sum(i * i for i in range(n))
async def async_wrapper():
loop = asyncio.get_event_loop()
# Запустить в потоке (по умолчанию используется ThreadPoolExecutor)
result = await loop.run_in_executor(None, cpu_intensive_task, 10**7)
return result
async def main():
# Теперь оба задания выполняются параллельно в разных потоках
task1 = asyncio.create_task(async_wrapper())
task2 = asyncio.create_task(async_wrapper())
results = await asyncio.gather(task1, task2)
print(f"Результаты: {results}") # Выполнится за ~2 сек вместо 4
Решение 2: ProcessPoolExecutor для истинного parallelism
Для тяжёлых вычислений используй процессы (обходит GIL):
import asyncio
from concurrent.futures import ProcessPoolExecutor
def heavy_computation(n: int) -> int:
"""Очень тяжёлый расчёт"""
return sum(i * i for i in range(n)) ** 2
async def main():
loop = asyncio.get_event_loop()
# ProcessPoolExecutor — запустит в отдельном процессе (обходит GIL)
with ProcessPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, heavy_computation, 10**8)
print(f"Результат: {result}")
Когда использовать:
- ThreadPoolExecutor: блокирующий I/O (sleep, HTTP без aiohttp)
- ProcessPoolExecutor: чистые вычисления, обход GIL
Решение 3: Разбить на chunks с yield
Делить работу на части и отдавать управление event loop:
import asyncio
async def process_data_chunked(data: list, chunk_size: int = 1000) -> list:
"""Обрабатывать данные кусками"""
result = []
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
# Обработать кусок
processed = [process(item) for item in chunk]
result.extend(processed)
# Отдать управление event loop
await asyncio.sleep(0) # Yield point
return result
async def main():
data = list(range(10**6))
result = await process_data_chunked(data)
print(f"Обработано {len(result)} элементов")
Решение 4: Асинхронная версия с asyncio.to_thread (Python 3.9+)
Удобный способ запустить блокирующий код в потоке:
import asyncio
def slow_cpu_task(n: int) -> int:
"""Синхронная CPU-bound функция"""
total = 0
for i in range(n):
total += i ** 2
return total
async def main():
# asyncio.to_thread — встроенный способ (Python 3.9+)
result = await asyncio.to_thread(slow_cpu_task, 10**7)
print(f"Результат: {result}")
async def concurrent_main():
# Параллельное выполнение в потоках
results = await asyncio.gather(
asyncio.to_thread(slow_cpu_task, 10**7),
asyncio.to_thread(slow_cpu_task, 10**7),
asyncio.to_thread(slow_cpu_task, 10**7),
)
print(f"Все результаты: {results}")
Решение 5: Использовать правильные асинхронные библиотеки
Некоторые операции выглядят CPU-bound, но имеют асинхронные версии:
# ❌ Синхронный JSON (блокирует)
import json
data = json.loads(heavy_json_string) # Может заблокировать на долгое время
# ✅ Асинхронный JSON
import orjson # Очень быстрый JSON парсер
data = orjson.loads(heavy_json_string)
# ❌ Синхронный HTTP (блокирует)
import requests
response = requests.get("http://api.example.com") # Блокирует
# ✅ Асинхронный HTTP
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("http://api.example.com") as response:
data = await response.json()
Реальный пример: FastAPI с CPU-bound обработкой
from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import asyncio
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)
def analyze_image(image_path: str) -> dict:
"""Синхронный анализ изображения (CPU-bound)"""
import time
time.sleep(5) # Имитация тяжёлого расчёта
return {"objects": 10, "confidence": 0.95}
@app.post("/analyze")
async def analyze(image_path: str):
loop = asyncio.get_event_loop()
# Не блокируем event loop
result = await loop.run_in_executor(executor, analyze_image, image_path)
return {"status": "completed", "result": result}
@app.on_event("shutdown")
async def shutdown():
executor.shutdown(wait=True)
Сравнение подходов
| Подход | Когда использовать | Плюсы | Минусы |
|---|---|---|---|
run_in_executor(None) | Блокирующий I/O | Просто, встроено | GIL блокирует CPU |
ProcessPoolExecutor | Чистые вычисления | Обходит GIL | Overhead по памяти |
Chunking + await asyncio.sleep(0) | Обрабатываем свои данные | Контролируемо | Сложнее |
asyncio.to_thread() | Python 3.9+ | Удобно | Только для I/O |
| Асинхронные библиотеки | Всегда когда есть | Лучше всего | Не все имеют async вариант |
Вывод
Основные правила:
- Используй асинхронные I/O операции (aiohttp, asyncpg, aiomysql)
- CPU-bound код →
run_in_executor()с ProcessPoolExecutor - Блокирующий I/O → ThreadPoolExecutor
- Разбивай тяжёлые вычисления на chunks с
await asyncio.sleep(0) - Всегда профилируй — GIL может быть узким местом
Так асинхронный код останется действительно неблокирующим и отзывчивым.