← Назад к вопросам
Как называется функция позволяющая выполнять блокирующие задачи в отдельном потоке или процессе?
2.0 Middle🔥 191 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Выполнение блокирующих операций в async коде
Основная функция: run_in_executor()
Это основной инструмент в asyncio для выполнения блокирующих операций без блокирования event loop.
import asyncio
# Блокирующая функция (например, работа с файлом, CPU-интенсивная задача)
def blocking_operation(name: str, duration: int) -> str:
print(f"Начало блокирующей операции: {name}")
time.sleep(duration) # Это блокирует
return f"Завершено: {name}"
# Async функция
async def main():
loop = asyncio.get_event_loop()
# Выполняем блокирующую операцию в отдельном потоке
result = await loop.run_in_executor(None, blocking_operation, "task1", 5)
print(result) # "Завершено: task1"
Два типа executors
1. ThreadPoolExecutor (для I/O-блокирующих операций)
Для работы с файлами, БД, сетью (операции которые ждут):
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def read_file(filename: str) -> str:
time.sleep(2) # Имитируем чтение файла
return f"Содержимое {filename}"
async def main():
loop = asyncio.get_event_loop()
# Используем ThreadPoolExecutor с 4 потоками
with ThreadPoolExecutor(max_workers=4) as executor:
result = await loop.run_in_executor(executor, read_file, "data.txt")
print(result)
asyncio.run(main())
Почему работает: пока один поток ждёт ответ от БД, другой поток может обработать другой запрос.
2. ProcessPoolExecutor (для CPU-интенсивных операций)
Для вычислений, которые требуют мощности процессора:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib
def cpu_intensive(data: str) -> str:
# Вычисляем хеш 1 миллион раз (CPU-интенсивно)
result = data
for _ in range(1000000):
result = hashlib.sha256(result.encode()).hexdigest()
return result
async def main():
loop = asyncio.get_event_loop()
# Используем ProcessPoolExecutor (отдельные процессы)
with ProcessPoolExecutor(max_workers=4) as executor:
result = await loop.run_in_executor(executor, cpu_intensive, "data")
print(f"Результат: {result[:20]}...")
asyncio.run(main())
Важно: ProcessPoolExecutor создаёт отдельные процессы, не потоки. Это обходит GIL (Global Interpreter Lock).
Практический пример: FastAPI с блокирующей БД
from fastapi import FastAPI, HTTPException
import asyncio
from concurrent.futures import ThreadPoolExecutor
import sqlite3
from datetime import datetime
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=10)
def get_user_from_db(user_id: int) -> dict:
"""Блокирующая функция (sqlite3 не имеет async драйвера)"""
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
cursor.execute("SELECT id, name, email FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
return {"id": row[0], "name": row[1], "email": row[2]}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
loop = asyncio.get_event_loop()
# Выполняем блокирующую операцию в отдельном потоке
user = await loop.run_in_executor(executor, get_user_from_db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
Альтернатива: asyncio.to_thread (Python 3.9+)
Удобный способ без явного создания executor:
import asyncio
def blocking_func():
time.sleep(5)
return "Done"
async def main():
# Вместо loop.run_in_executor()
result = await asyncio.to_thread(blocking_func)
print(result)
asyncio.run(main())
Правильный выбор
| Ситуация | Решение | Пример |
|---|---|---|
| Блокирующая БД (sync) | ThreadPoolExecutor | sqlite3, psycopg2 (sync) |
| Работа с файлами | ThreadPoolExecutor | open(), read(), write() |
| Обращение к внешнему API (sync) | ThreadPoolExecutor | requests.get() |
| CPU-интенсивные вычисления | ProcessPoolExecutor | Шифрование, сжатие данных |
| async БД/HTTP | Ничего не нужно! | asyncpg, httpx, aiohttp |
Сравнение подходов
Плохо: блокирующий код в async функции
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# ПЛОХО! Блокирует весь event loop
user = get_user_from_db(user_id) # 5 секунд ждёт
return user
# Если приходит 100 запросов — все ждут 5 секунд = 500 секунд обработки
Хорошо: используем executor
@app.get("/users/{user_id}")
async def get_user(user_id: int):
loop = asyncio.get_event_loop()
# ХОРОШО! Блокирует только один поток, event loop свободен
user = await loop.run_in_executor(executor, get_user_from_db, user_id)
return user
# 100 запросов с 10 потоками = все обрабатываются параллельно
Лучшая практика: использовать async драйверы
# ЛУЧШЕ ВСЕГО! Не требует executor
import asyncpg
DATABASE_URL = "postgresql://user:password@localhost/mydb"
db = asyncpg.create_pool(DATABASE_URL)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with db.acquire() as connection:
user = await connection.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return user
Ошибки при использовании run_in_executor
❌ Передача sync функции в async контекст без executor
async def get_data():
return blocking_func() # НЕПРАВИЛЬНО!
❌ Слишком много потоков
executor = ThreadPoolExecutor(max_workers=1000) # НЕПРАВИЛЬНО! GIL
❌ Забыли await
result = loop.run_in_executor(executor, blocking_func) # НЕПРАВИЛЬНО!
# Должно быть:
result = await loop.run_in_executor(executor, blocking_func)
✅ Правильно
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_func) # None = default executor (ThreadPoolExecutor)
Итог
run_in_executor() — это функция для выполнения блокирующего кода в отдельном потоке/процессе без блокирования event loop.
- Для I/O: используй ThreadPoolExecutor
- Для CPU: используй ProcessPoolExecutor
- Для новых проектов: используй async драйверы (asyncpg, aiohttp)