← Назад к вопросам

Какие подходы используются в Python для реализации параллельной обработки запросов?

3.0 Senior🔥 231 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Параллельная обработка запросов в Python

Параллельная обработка запросов — критическая оптимизация для веб-приложений, позволяющая обслуживать множество клиентов одновременно. Python предлагает несколько подходов.

1. Asyncio + ASGI (рекомендуется для веб)

Одна из самых популярных архитектур — асинхронный веб-фреймворк.

# FastAPI - современный, асинхронный фреймворк
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Это может обработать 10000 одновременных запросов!
    # Каждый в отдельной coroutine
    user = await db.get_user(user_id)
    return user

@app.get("/users/{user_id}/posts")
async def get_user_posts(user_id: int):
    # Параллельные запросы к БД
    user = await db.get_user(user_id)
    posts = await db.get_posts(user_id)
    # Если оба I/O, выполнятся параллельно
    return {"user": user, "posts": posts}

# Запуск с Uvicorn (ASGI сервер)
# uvicorn main:app --workers 4

Как работает:

Запрос 1: User 100  -> await DB -> Запрос 2 вставляется
Запрос 2: User 200  -> await DB -> Запрос 1 продолжается
Запрос 1:           <- DB ответ -> Отправить клиенту
Запрос 2:           <- DB ответ -> Отправить клиенту

Плюсы:

  • Масштабируемость: тысячи одновременных соединений
  • Низкий overhead
  • Идеально для I/O-bound (сеть, БД)

Минусы:

  • Весь стек должен быть асинхронным
  • Сложнее отлаживать

2. Threading + WSGI

Традиционный подход с потоками (менее эффективен, но проще).

# Flask/Django с ThreadPoolExecutor
from flask import Flask
from concurrent.futures import ThreadPoolExecutor

app = Flask(__name__)
executor = ThreadPoolExecutor(max_workers=10)

@app.route("/users/<int:user_id>")
def get_user(user_id):
    # Каждый запрос в отдельном потоке
    # max_workers = 10, значит макс 10 одновременных запросов
    user = db.get_user(user_id)
    return {"user": user}

@app.route("/process", methods=["POST"])
def process_async():
    # Запустить долгую задачу в фоне
    future = executor.submit(long_running_task)
    return {"task_id": future}

def long_running_task():
    # Это выполнится в отдельном потоке
    time.sleep(10)
    return "Done"

if __name__ == "__main__":
    app.run(threaded=True)  # Каждый запрос в потоке

Как работает:

Запрос 1 -> Поток 1 -> DB запрос -> Ждёт
Запрос 2 -> Поток 2 -> DB запрос -> Ждёт  
Запрос 3 -> Поток 3 -> DB запрос -> Ждёт

Плюсы:

  • Простая интеграция с синхронным кодом
  • Проверенный подход
  • Легче отлаживать

Минусы:

  • Ограничение: ~10-100 потоков (memory)
  • Context switch overhead
  • Race conditions при доступе к данным

3. Gunicorn + Worker Pool

Производственный подход: несколько процессов с потоками.

# Запуск Flask/Django с Gunicorn
# gunicorn app:app -w 4 -k gevent --worker-connections 1000

from flask import Flask

app = Flask(__name__)

@app.route("/api/data")
def get_data():
    # Каждый запрос может быть в разном процессе
    return {"data": fetch_data()}

if __name__ == "__main__":
    app.run()

Архитектура:

Gunicorn Master
├── Worker 1 (процесс) - 1000 greenlets
├── Worker 2 (процесс) - 1000 greenlets
├── Worker 3 (процесс) - 1000 greenlets
└── Worker 4 (процесс) - 1000 greenlets

Итого: 4 процесса × 1000 greenlets = 4000 одновременных запросов

Команды:

# 4 worker процесса, каждый с 10 потоками
gunicorn -w 4 -k threads -t 30 app:app

# Gevent (greenlets, как asyncio но для синхронного кода)
gunicorn -w 4 -k gevent -w-connections 1000 app:app

# Uvicorn с asyncio
uvicorn -w 4 --timeout 120 app:app

4. Celery + Message Queue

Для долгих фоновых задач.

# settings.py
BROKER_URL = 'redis://localhost:6379'
RESULT_BACKEND = 'redis://localhost:6379'

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def long_running_task(user_id):
    # Это выполнится в отдельном worker процессе
    import time
    time.sleep(30)  # Долгая операция
    send_email(user_id)
    return f"Task completed for user {user_id}"

# views.py
from flask import Flask
from tasks import long_running_task

app = Flask(__name__)

@app.route("/send-email/<user_id>", methods=["POST"])
def send_email_async(user_id):
    # Добавить задачу в очередь
    task = long_running_task.delay(user_id)
    return {"task_id": task.id, "status": "queued"}

@app.route("/task-status/<task_id>")
def get_task_status(task_id):
    # Получить статус задачи
    from celery.result import AsyncResult
    task = AsyncResult(task_id)
    return {
        "status": task.status,
        "result": task.result if task.ready() else None
    }

Архитектура:

Добавить задачу
      ↓
FastAPI app → Redis queue ← Celery worker 1
                            ← Celery worker 2
                            ← Celery worker 3
                            ← Celery worker 4

Запуск workers:

# 4 worker процесса
celery -A tasks worker --loglevel=info --concurrency=4

# 10 worker процессов для обработки больше задач
celery -A tasks worker --loglevel=info --concurrency=10

5. Kombinovaný přístup

Оптимальная архитектура для production:

from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
from celery import Celery

app = FastAPI()
celery = Celery('tasks')

# Быстрые операции: async
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession):
    user = await db.execute(
        select(User).where(User.id == user_id)
    )
    return user.scalar()

# Долгие операции: Celery
@app.post("/process-data")
async def process_data(data: dict):
    # Запустить в фоне
    task = celery.send_task(
        'tasks.process_data_task',
        args=(data,)
    )
    return {"task_id": task.id}

# Параллельные I/O операции
@app.get("/user/{user_id}/summary")
async def get_user_summary(user_id: int, db: AsyncSession):
    # Запросить данные параллельно
    user_task = asyncio.create_task(db.get_user(user_id))
    posts_task = asyncio.create_task(db.get_user_posts(user_id))
    comments_task = asyncio.create_task(db.get_user_comments(user_id))
    
    user, posts, comments = await asyncio.gather(
        user_task, posts_task, comments_task
    )
    
    return {
        "user": user,
        "posts_count": len(posts),
        "comments_count": len(comments)
    }

Сравнение подходов

ПодходОдновременноMemoryПростотаI/OCPU
Asyncio10000+НизкаяСредняяОтличноПлохо
Threading100СредняяВысокаяХорошоПлохо
Gunicorn1000+СредняяСредняяХорошоСреднее
Celery∞ задачСредняяВысокаяСреднееХорошо

Выбор подхода

# Микросервис с HTTP API (10000+ RPS)
# -> FastAPI + Asyncio + PostgreSQL (async driver)

# Традиционное веб-приложение
# -> Flask/Django + Gunicorn + ThreadPoolExecutor

# Долгие задачи (отправка email, обработка видео)
# -> Celery + Redis/RabbitMQ

# Real-time приложение (чат, уведомления)
# -> FastAPI + Asyncio + WebSocket

# Batch обработка больших данных
# -> Multiprocessing + ProcessPoolExecutor

Практический пример: Production setup

# docker-compose.yml
version: '3.9'
services:
  app:
    build: .
    command: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
    ports:
      - "8000:8000"
    depends_on:
      - db
      - redis
  
  db:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: password
  
  redis:
    image: redis:7
  
  worker:
    build: .
    command: celery -A tasks worker --loglevel=info --concurrency=4
    depends_on:
      - redis
      - db
  
  nginx:
    image: nginx:latest
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

Эта архитектура позволяет обрабатывать тысячи одновременных запросов с масштабируемостью.