← Назад к вопросам

В каких операциях можно добиться асинхронного выполнения процесса

2.0 Middle🔥 191 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Асинхронное выполнение процессов в Python

Асинхронное выполнение позволяет программе не блокироваться в ожидании результата операции. Это критично для масштабирования приложений. Разберём все методы и когда их использовать.

1. Асинхронный IO (async/await)

Оптимален для операций, которые ждут внешних ресурсов (сеть, диск).

HTTP запросы

import asyncio
import httpx

# Синхронный код — блокирует
def fetch_sync(urls):
    results = []
    for url in urls:
        response = requests.get(url)  # Ждёт 5 сек на каждый URL
        results.append(response.json())
    return results
    # Общее время: 5 * 3 = 15 секунд

# Асинхронный код — параллельно
async def fetch_async(urls):
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)  # Все одновременно
        return [r.json() for r in responses]

# Использование
async def main():
    urls = ["https://api.github.com/users/torvalds",
            "https://api.github.com/users/gvanrossum",
            "https://api.github.com/users/guido"]
    results = await fetch_async(urls)
    print(results)

asyncio.run(main())
# Общее время: ~5 секунд (вместо 15)

Запросы к БД (Database IO)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import selectinload

# Асинхронный PostgreSQL драйвер
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=False
)

# Синхронный запрос
def get_users_sync():
    session = Session()
    users = session.query(User).limit(1000).all()  # Блокирует на 200ms
    for user in users:
        user.posts  # N+1 problem, ещё 200ms
    return users

# Асинхронный запрос
async def get_users_async():
    async with AsyncSession(engine) as session:
        result = await session.execute(
            select(User).options(selectinload(User.posts)).limit(1000)
        )
        return result.scalars().all()  # Одновременно всё

# Использование в FastAPI
from fastapi import FastAPI

app = FastAPI()

@app.get("/users")
async def list_users():
    return await get_users_async()  # Не блокирует thread pool

Файловые операции (Disk IO)

import aiofiles
import asyncio

# Синхронный код
def read_files_sync(files):
    results = []
    for file_path in files:
        with open(file_path, 'r') as f:
            results.append(f.read())  # Блокирует
    return results

# Асинхронный код
async def read_files_async(files):
    tasks = []
    for file_path in files:
        tasks.append(read_file(file_path))
    return await asyncio.gather(*tasks)

async def read_file(file_path):
    async with aiofiles.open(file_path, 'r') as f:
        return await f.read()

# FastAPI endpoint
@app.post("/process-files")
async def process_files(file_paths: list):
    contents = await read_files_async(file_paths)
    return {"files": len(contents)}

2. Celery — асинхронные задачи (Distributed Tasks)

Для длительных операций, которые должны выполняться в фоне.

Основы

from celery import Celery
from celery.result import AsyncResult

app = Celery(
    'tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

# Определяем асинхронную задачу
@app.task
def send_email(to_email: str, subject: str):
    """Эта функция выполнится в отдельном процессе"""
    import smtplib
    
    msg = f"Subject: {subject}\n\nHello!"
    smtplib.SMTP('localhost').sendmail(
        'from@example.com',
        to_email,
        msg
    )
    return f"Email sent to {to_email}"

# Использование в FastAPI
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app_api = FastAPI()

@app_api.post("/send-email")
async def send_email_endpoint(email: str, subject: str):
    # Отправляем задачу в очередь, не ждём ответа
    task = send_email.delay(email, subject)  # Немедленно возвращает
    
    return {
        "task_id": task.id,
        "status": "Email is being sent"
    }

@app_api.get("/task-status/{task_id}")
async def check_task_status(task_id: str):
    task_result = AsyncResult(task_id, app=app)
    return {
        "task_id": task_id,
        "status": task_result.state,
        "result": task_result.result if task_result.ready() else None
    }

Примеры задач

# 1. Обработка больших файлов
@app.task
def process_csv(file_path: str):
    import pandas as pd
    df = pd.read_csv(file_path)  # Может быть медленным
    # Обрабатываем...
    return f"Processed {len(df)} rows"

# 2. Отправка множество уведомлений
@app.task
def send_bulk_notifications(user_ids: list):
    for user_id in user_ids:
        notify_user(user_id)  # Может быть медленным
    return f"Notified {len(user_ids)} users"

# 3. Генерация отчётов
@app.task
def generate_report(report_type: str, date_range: dict):
    # Может занять минуты
    data = fetch_analytics_data(report_type, date_range)
    pdf = generate_pdf(data)
    save_to_s3(pdf, f"reports/{report_type}_{time.time()}.pdf")
    return {"status": "Report ready", "url": "..."}

# 4. Периодические задачи (Cron-like)
from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-old-sessions': {
        'task': 'tasks.cleanup_sessions',
        'schedule': crontab(hour=2, minute=0),  # Каждую ночь в 2:00
    },
    'send-daily-digest': {
        'task': 'tasks.send_digest',
        'schedule': crontab(hour=9, minute=0, day_of_week='1-5'),  # Пн-пт 9:00
    },
}

@app.task
def cleanup_sessions():
    # Удаляем старые сессии
    pass

@app.task
def send_digest():
    # Отправляем daily digest
    pass

3. RQ (Redis Queue) — более простой аналог Celery

from redis import Redis
from rq import Queue

redis_conn = Redis()
queue = Queue(connection=redis_conn)

def long_running_task(n):
    # Может быть медленным
    import time
    time.sleep(n)
    return f"Slept for {n} seconds"

# В эндпоинте
@app.post("/start-task")
async def start_task():
    job = queue.enqueue(long_running_task, 30)  # Запускает в фоне
    return {"job_id": job.id}

@app.get("/task/{job_id}")
async def get_task_result(job_id: str):
    job = queue.fetch_job(job_id)
    return {
        "status": job.get_status(),
        "result": job.result if job.is_finished else None,
        "error": job.exc_info if job.is_failed else None
    }

4. Threading — для CPU-bound операций (осторожно!)

from concurrent.futures import ThreadPoolExecutor
import threading

# В FastAPI
from fastapi import BackgroundTasks

app = FastAPI()

def cpu_intensive_task(n: int):
    # Факториал (CPU-bound)
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

@app.post("/calculate")
async def calculate(n: int, background_tasks: BackgroundTasks):
    # Запускаем в фоновом потоке
    background_tasks.add_task(cpu_intensive_task, n)
    return {"status": "Calculation started"}

# Или с ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(cpu_intensive_task, 100)
    result = future.result()  # Ждёт результата

5. Multiprocessing — для истинного параллелизма

from multiprocessing import Process, Pool

# CPU-intensive функция
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# Параллельно
if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(fibonacci, [30, 31, 32, 33])
    print(results)

Матрица выбора

┌─────────────────────────┬─────────────────┬────────────────┐
│ Тип операции            │ Длительность    │ Рекомендуется  │
├─────────────────────────┼─────────────────┼────────────────┤
│ HTTP запросы            │ 100-5000ms      │ async/await    │
│ Database queries        │ 10-500ms        │ async/await    │
│ File I/O                │ 1-1000ms        │ async/await    │
│ Email/SMS отправка      │ 100ms-30s       │ Celery         │
│ Report generation       │ 10-60 сек       │ Celery         │
│ CSV processing          │ 5-300 сек       │ Celery         │
│ Синтез видео            │ 1-60 мин        │ Celery + Beats │
│ CPU вычисления (факториал) │ любое       │ multiprocessing│
└─────────────────────────┴─────────────────┴────────────────┘

Полный пример: Email рассылка

from fastapi import FastAPI
from celery import Celery
import asyncio
import httpx

app = FastAPI()
celery_app = Celery('tasks', broker='redis://localhost')

# Асинхронный запрос к API
async def get_user_emails(user_ids: list):
    async with httpx.AsyncClient() as client:
        tasks = [client.get(f"https://api.example.com/users/{uid}") for uid in user_ids]
        responses = await asyncio.gather(*tasks)
        return [r.json()['email'] for r in responses]

# Асинхронная задача — отправка email
@celery_app.task
def send_email(email: str, template: str):
    # Имитируем SMTP
    import smtplib
    msg = f"Template: {template}"
    # smtplib.SMTP().sendmail(...)
    return f"Sent to {email}"

# Эндпоинт — собираем всё вместе
@app.post("/send-emails")
async def send_emails_endpoint(user_ids: list, template: str):
    # 1. Асинхронно получаем emails
    emails = await get_user_emails(user_ids)
    
    # 2. Отправляем каждый email в фоне через Celery
    tasks = []
    for email in emails:
        task = send_email.delay(email, template)
        tasks.append(task.id)
    
    return {
        "total_emails": len(emails),
        "task_ids": tasks,
        "status": "Emails queued"
    }

Ключевые выводы

  • async/await — для I/O операций (сеть, БД, файлы), встроено в Python
  • Celery — для heavy lifting (отправка emails, отчёты, видео), с Redis/RabbitMQ
  • Background Tasks (FastAPI) — для простых фоновых задач
  • ThreadPoolExecutor — для синхронного кода в параллели (ограничено GIL)
  • multiprocessing — для CPU-bound операций (истинный параллелизм)

Выбирайте в зависимости от типа операции и требований к масштабированию.

В каких операциях можно добиться асинхронного выполнения процесса | PrepBro