← Назад к вопросам

Как называется функция позволяющая выполнять блокирующие задачи в отдельном потоке или процессе?

2.0 Middle🔥 191 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Выполнение блокирующих операций в async коде

Основная функция: run_in_executor()

Это основной инструмент в asyncio для выполнения блокирующих операций без блокирования event loop.

import asyncio

# Блокирующая функция (например, работа с файлом, CPU-интенсивная задача)
def blocking_operation(name: str, duration: int) -> str:
    print(f"Начало блокирующей операции: {name}")
    time.sleep(duration)  # Это блокирует
    return f"Завершено: {name}"

# Async функция
async def main():
    loop = asyncio.get_event_loop()
    
    # Выполняем блокирующую операцию в отдельном потоке
    result = await loop.run_in_executor(None, blocking_operation, "task1", 5)
    print(result)  # "Завершено: task1"

Два типа executors

1. ThreadPoolExecutor (для I/O-блокирующих операций)

Для работы с файлами, БД, сетью (операции которые ждут):

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def read_file(filename: str) -> str:
    time.sleep(2)  # Имитируем чтение файла
    return f"Содержимое {filename}"

async def main():
    loop = asyncio.get_event_loop()
    
    # Используем ThreadPoolExecutor с 4 потоками
    with ThreadPoolExecutor(max_workers=4) as executor:
        result = await loop.run_in_executor(executor, read_file, "data.txt")
        print(result)

asyncio.run(main())

Почему работает: пока один поток ждёт ответ от БД, другой поток может обработать другой запрос.

2. ProcessPoolExecutor (для CPU-интенсивных операций)

Для вычислений, которые требуют мощности процессора:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib

def cpu_intensive(data: str) -> str:
    # Вычисляем хеш 1 миллион раз (CPU-интенсивно)
    result = data
    for _ in range(1000000):
        result = hashlib.sha256(result.encode()).hexdigest()
    return result

async def main():
    loop = asyncio.get_event_loop()
    
    # Используем ProcessPoolExecutor (отдельные процессы)
    with ProcessPoolExecutor(max_workers=4) as executor:
        result = await loop.run_in_executor(executor, cpu_intensive, "data")
        print(f"Результат: {result[:20]}...")

asyncio.run(main())

Важно: ProcessPoolExecutor создаёт отдельные процессы, не потоки. Это обходит GIL (Global Interpreter Lock).

Практический пример: FastAPI с блокирующей БД

from fastapi import FastAPI, HTTPException
import asyncio
from concurrent.futures import ThreadPoolExecutor
import sqlite3
from datetime import datetime

app = FastAPI()
executor = ThreadPoolExecutor(max_workers=10)

def get_user_from_db(user_id: int) -> dict:
    """Блокирующая функция (sqlite3 не имеет async драйвера)"""
    conn = sqlite3.connect("users.db")
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, email FROM users WHERE id = ?", (user_id,))
    row = cursor.fetchone()
    conn.close()
    
    if not row:
        return None
    return {"id": row[0], "name": row[1], "email": row[2]}

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    loop = asyncio.get_event_loop()
    
    # Выполняем блокирующую операцию в отдельном потоке
    user = await loop.run_in_executor(executor, get_user_from_db, user_id)
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

Альтернатива: asyncio.to_thread (Python 3.9+)

Удобный способ без явного создания executor:

import asyncio

def blocking_func():
    time.sleep(5)
    return "Done"

async def main():
    # Вместо loop.run_in_executor()
    result = await asyncio.to_thread(blocking_func)
    print(result)

asyncio.run(main())

Правильный выбор

СитуацияРешениеПример
Блокирующая БД (sync)ThreadPoolExecutorsqlite3, psycopg2 (sync)
Работа с файламиThreadPoolExecutoropen(), read(), write()
Обращение к внешнему API (sync)ThreadPoolExecutorrequests.get()
CPU-интенсивные вычисленияProcessPoolExecutorШифрование, сжатие данных
async БД/HTTPНичего не нужно!asyncpg, httpx, aiohttp

Сравнение подходов

Плохо: блокирующий код в async функции

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # ПЛОХО! Блокирует весь event loop
    user = get_user_from_db(user_id)  # 5 секунд ждёт
    return user

# Если приходит 100 запросов — все ждут 5 секунд = 500 секунд обработки

Хорошо: используем executor

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    loop = asyncio.get_event_loop()
    # ХОРОШО! Блокирует только один поток, event loop свободен
    user = await loop.run_in_executor(executor, get_user_from_db, user_id)
    return user

# 100 запросов с 10 потоками = все обрабатываются параллельно

Лучшая практика: использовать async драйверы

# ЛУЧШЕ ВСЕГО! Не требует executor
import asyncpg

DATABASE_URL = "postgresql://user:password@localhost/mydb"
db = asyncpg.create_pool(DATABASE_URL)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with db.acquire() as connection:
        user = await connection.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
        return user

Ошибки при использовании run_in_executor

Передача sync функции в async контекст без executor

async def get_data():
    return blocking_func()  # НЕПРАВИЛЬНО!

Слишком много потоков

executor = ThreadPoolExecutor(max_workers=1000)  # НЕПРАВИЛЬНО! GIL

Забыли await

result = loop.run_in_executor(executor, blocking_func)  # НЕПРАВИЛЬНО!
# Должно быть:
result = await loop.run_in_executor(executor, blocking_func)

Правильно

loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_func)  # None = default executor (ThreadPoolExecutor)

Итог

run_in_executor() — это функция для выполнения блокирующего кода в отдельном потоке/процессе без блокирования event loop.

  • Для I/O: используй ThreadPoolExecutor
  • Для CPU: используй ProcessPoolExecutor
  • Для новых проектов: используй async драйверы (asyncpg, aiohttp)