← Назад к вопросам
Какие подходы используются в Python для реализации параллельной обработки запросов?
3.0 Senior🔥 231 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Параллельная обработка запросов в Python
Параллельная обработка запросов — критическая оптимизация для веб-приложений, позволяющая обслуживать множество клиентов одновременно. Python предлагает несколько подходов.
1. Asyncio + ASGI (рекомендуется для веб)
Одна из самых популярных архитектур — асинхронный веб-фреймворк.
# FastAPI - современный, асинхронный фреймворк
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Это может обработать 10000 одновременных запросов!
# Каждый в отдельной coroutine
user = await db.get_user(user_id)
return user
@app.get("/users/{user_id}/posts")
async def get_user_posts(user_id: int):
# Параллельные запросы к БД
user = await db.get_user(user_id)
posts = await db.get_posts(user_id)
# Если оба I/O, выполнятся параллельно
return {"user": user, "posts": posts}
# Запуск с Uvicorn (ASGI сервер)
# uvicorn main:app --workers 4
Как работает:
Запрос 1: User 100 -> await DB -> Запрос 2 вставляется
Запрос 2: User 200 -> await DB -> Запрос 1 продолжается
Запрос 1: <- DB ответ -> Отправить клиенту
Запрос 2: <- DB ответ -> Отправить клиенту
Плюсы:
- Масштабируемость: тысячи одновременных соединений
- Низкий overhead
- Идеально для I/O-bound (сеть, БД)
Минусы:
- Весь стек должен быть асинхронным
- Сложнее отлаживать
2. Threading + WSGI
Традиционный подход с потоками (менее эффективен, но проще).
# Flask/Django с ThreadPoolExecutor
from flask import Flask
from concurrent.futures import ThreadPoolExecutor
app = Flask(__name__)
executor = ThreadPoolExecutor(max_workers=10)
@app.route("/users/<int:user_id>")
def get_user(user_id):
# Каждый запрос в отдельном потоке
# max_workers = 10, значит макс 10 одновременных запросов
user = db.get_user(user_id)
return {"user": user}
@app.route("/process", methods=["POST"])
def process_async():
# Запустить долгую задачу в фоне
future = executor.submit(long_running_task)
return {"task_id": future}
def long_running_task():
# Это выполнится в отдельном потоке
time.sleep(10)
return "Done"
if __name__ == "__main__":
app.run(threaded=True) # Каждый запрос в потоке
Как работает:
Запрос 1 -> Поток 1 -> DB запрос -> Ждёт
Запрос 2 -> Поток 2 -> DB запрос -> Ждёт
Запрос 3 -> Поток 3 -> DB запрос -> Ждёт
Плюсы:
- Простая интеграция с синхронным кодом
- Проверенный подход
- Легче отлаживать
Минусы:
- Ограничение: ~10-100 потоков (memory)
- Context switch overhead
- Race conditions при доступе к данным
3. Gunicorn + Worker Pool
Производственный подход: несколько процессов с потоками.
# Запуск Flask/Django с Gunicorn
# gunicorn app:app -w 4 -k gevent --worker-connections 1000
from flask import Flask
app = Flask(__name__)
@app.route("/api/data")
def get_data():
# Каждый запрос может быть в разном процессе
return {"data": fetch_data()}
if __name__ == "__main__":
app.run()
Архитектура:
Gunicorn Master
├── Worker 1 (процесс) - 1000 greenlets
├── Worker 2 (процесс) - 1000 greenlets
├── Worker 3 (процесс) - 1000 greenlets
└── Worker 4 (процесс) - 1000 greenlets
Итого: 4 процесса × 1000 greenlets = 4000 одновременных запросов
Команды:
# 4 worker процесса, каждый с 10 потоками
gunicorn -w 4 -k threads -t 30 app:app
# Gevent (greenlets, как asyncio но для синхронного кода)
gunicorn -w 4 -k gevent -w-connections 1000 app:app
# Uvicorn с asyncio
uvicorn -w 4 --timeout 120 app:app
4. Celery + Message Queue
Для долгих фоновых задач.
# settings.py
BROKER_URL = 'redis://localhost:6379'
RESULT_BACKEND = 'redis://localhost:6379'
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def long_running_task(user_id):
# Это выполнится в отдельном worker процессе
import time
time.sleep(30) # Долгая операция
send_email(user_id)
return f"Task completed for user {user_id}"
# views.py
from flask import Flask
from tasks import long_running_task
app = Flask(__name__)
@app.route("/send-email/<user_id>", methods=["POST"])
def send_email_async(user_id):
# Добавить задачу в очередь
task = long_running_task.delay(user_id)
return {"task_id": task.id, "status": "queued"}
@app.route("/task-status/<task_id>")
def get_task_status(task_id):
# Получить статус задачи
from celery.result import AsyncResult
task = AsyncResult(task_id)
return {
"status": task.status,
"result": task.result if task.ready() else None
}
Архитектура:
Добавить задачу
↓
FastAPI app → Redis queue ← Celery worker 1
← Celery worker 2
← Celery worker 3
← Celery worker 4
Запуск workers:
# 4 worker процесса
celery -A tasks worker --loglevel=info --concurrency=4
# 10 worker процессов для обработки больше задач
celery -A tasks worker --loglevel=info --concurrency=10
5. Kombinovaný přístup
Оптимальная архитектура для production:
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
from celery import Celery
app = FastAPI()
celery = Celery('tasks')
# Быстрые операции: async
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession):
user = await db.execute(
select(User).where(User.id == user_id)
)
return user.scalar()
# Долгие операции: Celery
@app.post("/process-data")
async def process_data(data: dict):
# Запустить в фоне
task = celery.send_task(
'tasks.process_data_task',
args=(data,)
)
return {"task_id": task.id}
# Параллельные I/O операции
@app.get("/user/{user_id}/summary")
async def get_user_summary(user_id: int, db: AsyncSession):
# Запросить данные параллельно
user_task = asyncio.create_task(db.get_user(user_id))
posts_task = asyncio.create_task(db.get_user_posts(user_id))
comments_task = asyncio.create_task(db.get_user_comments(user_id))
user, posts, comments = await asyncio.gather(
user_task, posts_task, comments_task
)
return {
"user": user,
"posts_count": len(posts),
"comments_count": len(comments)
}
Сравнение подходов
| Подход | Одновременно | Memory | Простота | I/O | CPU |
|---|---|---|---|---|---|
| Asyncio | 10000+ | Низкая | Средняя | Отлично | Плохо |
| Threading | 100 | Средняя | Высокая | Хорошо | Плохо |
| Gunicorn | 1000+ | Средняя | Средняя | Хорошо | Среднее |
| Celery | ∞ задач | Средняя | Высокая | Среднее | Хорошо |
Выбор подхода
# Микросервис с HTTP API (10000+ RPS)
# -> FastAPI + Asyncio + PostgreSQL (async driver)
# Традиционное веб-приложение
# -> Flask/Django + Gunicorn + ThreadPoolExecutor
# Долгие задачи (отправка email, обработка видео)
# -> Celery + Redis/RabbitMQ
# Real-time приложение (чат, уведомления)
# -> FastAPI + Asyncio + WebSocket
# Batch обработка больших данных
# -> Multiprocessing + ProcessPoolExecutor
Практический пример: Production setup
# docker-compose.yml
version: '3.9'
services:
app:
build: .
command: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
ports:
- "8000:8000"
depends_on:
- db
- redis
db:
image: postgres:15
environment:
POSTGRES_PASSWORD: password
redis:
image: redis:7
worker:
build: .
command: celery -A tasks worker --loglevel=info --concurrency=4
depends_on:
- redis
- db
nginx:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
Эта архитектура позволяет обрабатывать тысячи одновременных запросов с масштабируемостью.