← Назад к вопросам

Как избежать блокировки асинхронности в случае CPU-bound задач?

3.0 Senior🔥 201 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Избежание блокировки асинхронности в CPU-bound задачах

Проблема: CPU-bound операции (вычисления, парсинг, обработка данных) блокируют event loop в асинхронном коде, даже если они выглядят неблокирующими.

Проблема: blocking CPU-bound code

import asyncio

async def process_large_file(filepath: str) -> int:
    # Это БЛОКИРУЕТ event loop!
    data = open(filepath).read()  # I/O wait — OK
    
    # Это БЛОКИРУЕТ event loop на долгое время!
    result = sum(int(line) for line in data.split("\\n"))  # CPU work
    
    return result

async def main():
    # Первый запрос будет заблокирован вторым на время CPU работы
    task1 = asyncio.create_task(process_large_file("file1.txt"))  # 5 сек CPU
    task2 = asyncio.create_task(process_large_file("file2.txt"))  # 5 сек CPU
    
    # Вместо параллельного выполнения 5 сек будет 10 сек!
    results = await asyncio.gather(task1, task2)
    print(results)

Решение 1: run_in_executor

Запустить CPU-bound код в отдельном потоке:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def cpu_intensive_task(n: int) -> int:
    """Синхронная CPU-bound функция"""
    return sum(i * i for i in range(n))

async def async_wrapper():
    loop = asyncio.get_event_loop()
    
    # Запустить в потоке (по умолчанию используется ThreadPoolExecutor)
    result = await loop.run_in_executor(None, cpu_intensive_task, 10**7)
    return result

async def main():
    # Теперь оба задания выполняются параллельно в разных потоках
    task1 = asyncio.create_task(async_wrapper())
    task2 = asyncio.create_task(async_wrapper())
    
    results = await asyncio.gather(task1, task2)
    print(f"Результаты: {results}")  # Выполнится за ~2 сек вместо 4

Решение 2: ProcessPoolExecutor для истинного parallelism

Для тяжёлых вычислений используй процессы (обходит GIL):

import asyncio
from concurrent.futures import ProcessPoolExecutor

def heavy_computation(n: int) -> int:
    """Очень тяжёлый расчёт"""
    return sum(i * i for i in range(n)) ** 2

async def main():
    loop = asyncio.get_event_loop()
    
    # ProcessPoolExecutor — запустит в отдельном процессе (обходит GIL)
    with ProcessPoolExecutor(max_workers=4) as pool:
        result = await loop.run_in_executor(pool, heavy_computation, 10**8)
    
    print(f"Результат: {result}")

Когда использовать:

  • ThreadPoolExecutor: блокирующий I/O (sleep, HTTP без aiohttp)
  • ProcessPoolExecutor: чистые вычисления, обход GIL

Решение 3: Разбить на chunks с yield

Делить работу на части и отдавать управление event loop:

import asyncio

async def process_data_chunked(data: list, chunk_size: int = 1000) -> list:
    """Обрабатывать данные кусками"""
    result = []
    
    for i in range(0, len(data), chunk_size):
        chunk = data[i:i + chunk_size]
        
        # Обработать кусок
        processed = [process(item) for item in chunk]
        result.extend(processed)
        
        # Отдать управление event loop
        await asyncio.sleep(0)  # Yield point
    
    return result

async def main():
    data = list(range(10**6))
    result = await process_data_chunked(data)
    print(f"Обработано {len(result)} элементов")

Решение 4: Асинхронная версия с asyncio.to_thread (Python 3.9+)

Удобный способ запустить блокирующий код в потоке:

import asyncio

def slow_cpu_task(n: int) -> int:
    """Синхронная CPU-bound функция"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

async def main():
    # asyncio.to_thread — встроенный способ (Python 3.9+)
    result = await asyncio.to_thread(slow_cpu_task, 10**7)
    print(f"Результат: {result}")

async def concurrent_main():
    # Параллельное выполнение в потоках
    results = await asyncio.gather(
        asyncio.to_thread(slow_cpu_task, 10**7),
        asyncio.to_thread(slow_cpu_task, 10**7),
        asyncio.to_thread(slow_cpu_task, 10**7),
    )
    print(f"Все результаты: {results}")

Решение 5: Использовать правильные асинхронные библиотеки

Некоторые операции выглядят CPU-bound, но имеют асинхронные версии:

# ❌ Синхронный JSON (блокирует)
import json
data = json.loads(heavy_json_string)  # Может заблокировать на долгое время

# ✅ Асинхронный JSON
import orjson  # Очень быстрый JSON парсер
data = orjson.loads(heavy_json_string)

# ❌ Синхронный HTTP (блокирует)
import requests
response = requests.get("http://api.example.com")  # Блокирует

# ✅ Асинхронный HTTP
import aiohttp
async with aiohttp.ClientSession() as session:
    async with session.get("http://api.example.com") as response:
        data = await response.json()

Реальный пример: FastAPI с CPU-bound обработкой

from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import asyncio

app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)

def analyze_image(image_path: str) -> dict:
    """Синхронный анализ изображения (CPU-bound)"""
    import time
    time.sleep(5)  # Имитация тяжёлого расчёта
    return {"objects": 10, "confidence": 0.95}

@app.post("/analyze")
async def analyze(image_path: str):
    loop = asyncio.get_event_loop()
    
    # Не блокируем event loop
    result = await loop.run_in_executor(executor, analyze_image, image_path)
    
    return {"status": "completed", "result": result}

@app.on_event("shutdown")
async def shutdown():
    executor.shutdown(wait=True)

Сравнение подходов

ПодходКогда использоватьПлюсыМинусы
run_in_executor(None)Блокирующий I/OПросто, встроеноGIL блокирует CPU
ProcessPoolExecutorЧистые вычисленияОбходит GILOverhead по памяти
Chunking + await asyncio.sleep(0)Обрабатываем свои данныеКонтролируемоСложнее
asyncio.to_thread()Python 3.9+УдобноТолько для I/O
Асинхронные библиотекиВсегда когда естьЛучше всегоНе все имеют async вариант

Вывод

Основные правила:

  1. Используй асинхронные I/O операции (aiohttp, asyncpg, aiomysql)
  2. CPU-bound код → run_in_executor() с ProcessPoolExecutor
  3. Блокирующий I/O → ThreadPoolExecutor
  4. Разбивай тяжёлые вычисления на chunks с await asyncio.sleep(0)
  5. Всегда профилируй — GIL может быть узким местом

Так асинхронный код останется действительно неблокирующим и отзывчивым.