← Назад к вопросам
В каких операциях можно добиться асинхронного выполнения процесса
2.0 Middle🔥 191 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Асинхронное выполнение процессов в Python
Асинхронное выполнение позволяет программе не блокироваться в ожидании результата операции. Это критично для масштабирования приложений. Разберём все методы и когда их использовать.
1. Асинхронный IO (async/await)
Оптимален для операций, которые ждут внешних ресурсов (сеть, диск).
HTTP запросы
import asyncio
import httpx
# Синхронный код — блокирует
def fetch_sync(urls):
results = []
for url in urls:
response = requests.get(url) # Ждёт 5 сек на каждый URL
results.append(response.json())
return results
# Общее время: 5 * 3 = 15 секунд
# Асинхронный код — параллельно
async def fetch_async(urls):
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks) # Все одновременно
return [r.json() for r in responses]
# Использование
async def main():
urls = ["https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum",
"https://api.github.com/users/guido"]
results = await fetch_async(urls)
print(results)
asyncio.run(main())
# Общее время: ~5 секунд (вместо 15)
Запросы к БД (Database IO)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import selectinload
# Асинхронный PostgreSQL драйвер
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=False
)
# Синхронный запрос
def get_users_sync():
session = Session()
users = session.query(User).limit(1000).all() # Блокирует на 200ms
for user in users:
user.posts # N+1 problem, ещё 200ms
return users
# Асинхронный запрос
async def get_users_async():
async with AsyncSession(engine) as session:
result = await session.execute(
select(User).options(selectinload(User.posts)).limit(1000)
)
return result.scalars().all() # Одновременно всё
# Использование в FastAPI
from fastapi import FastAPI
app = FastAPI()
@app.get("/users")
async def list_users():
return await get_users_async() # Не блокирует thread pool
Файловые операции (Disk IO)
import aiofiles
import asyncio
# Синхронный код
def read_files_sync(files):
results = []
for file_path in files:
with open(file_path, 'r') as f:
results.append(f.read()) # Блокирует
return results
# Асинхронный код
async def read_files_async(files):
tasks = []
for file_path in files:
tasks.append(read_file(file_path))
return await asyncio.gather(*tasks)
async def read_file(file_path):
async with aiofiles.open(file_path, 'r') as f:
return await f.read()
# FastAPI endpoint
@app.post("/process-files")
async def process_files(file_paths: list):
contents = await read_files_async(file_paths)
return {"files": len(contents)}
2. Celery — асинхронные задачи (Distributed Tasks)
Для длительных операций, которые должны выполняться в фоне.
Основы
from celery import Celery
from celery.result import AsyncResult
app = Celery(
'tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379'
)
# Определяем асинхронную задачу
@app.task
def send_email(to_email: str, subject: str):
"""Эта функция выполнится в отдельном процессе"""
import smtplib
msg = f"Subject: {subject}\n\nHello!"
smtplib.SMTP('localhost').sendmail(
'from@example.com',
to_email,
msg
)
return f"Email sent to {to_email}"
# Использование в FastAPI
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app_api = FastAPI()
@app_api.post("/send-email")
async def send_email_endpoint(email: str, subject: str):
# Отправляем задачу в очередь, не ждём ответа
task = send_email.delay(email, subject) # Немедленно возвращает
return {
"task_id": task.id,
"status": "Email is being sent"
}
@app_api.get("/task-status/{task_id}")
async def check_task_status(task_id: str):
task_result = AsyncResult(task_id, app=app)
return {
"task_id": task_id,
"status": task_result.state,
"result": task_result.result if task_result.ready() else None
}
Примеры задач
# 1. Обработка больших файлов
@app.task
def process_csv(file_path: str):
import pandas as pd
df = pd.read_csv(file_path) # Может быть медленным
# Обрабатываем...
return f"Processed {len(df)} rows"
# 2. Отправка множество уведомлений
@app.task
def send_bulk_notifications(user_ids: list):
for user_id in user_ids:
notify_user(user_id) # Может быть медленным
return f"Notified {len(user_ids)} users"
# 3. Генерация отчётов
@app.task
def generate_report(report_type: str, date_range: dict):
# Может занять минуты
data = fetch_analytics_data(report_type, date_range)
pdf = generate_pdf(data)
save_to_s3(pdf, f"reports/{report_type}_{time.time()}.pdf")
return {"status": "Report ready", "url": "..."}
# 4. Периодические задачи (Cron-like)
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-old-sessions': {
'task': 'tasks.cleanup_sessions',
'schedule': crontab(hour=2, minute=0), # Каждую ночь в 2:00
},
'send-daily-digest': {
'task': 'tasks.send_digest',
'schedule': crontab(hour=9, minute=0, day_of_week='1-5'), # Пн-пт 9:00
},
}
@app.task
def cleanup_sessions():
# Удаляем старые сессии
pass
@app.task
def send_digest():
# Отправляем daily digest
pass
3. RQ (Redis Queue) — более простой аналог Celery
from redis import Redis
from rq import Queue
redis_conn = Redis()
queue = Queue(connection=redis_conn)
def long_running_task(n):
# Может быть медленным
import time
time.sleep(n)
return f"Slept for {n} seconds"
# В эндпоинте
@app.post("/start-task")
async def start_task():
job = queue.enqueue(long_running_task, 30) # Запускает в фоне
return {"job_id": job.id}
@app.get("/task/{job_id}")
async def get_task_result(job_id: str):
job = queue.fetch_job(job_id)
return {
"status": job.get_status(),
"result": job.result if job.is_finished else None,
"error": job.exc_info if job.is_failed else None
}
4. Threading — для CPU-bound операций (осторожно!)
from concurrent.futures import ThreadPoolExecutor
import threading
# В FastAPI
from fastapi import BackgroundTasks
app = FastAPI()
def cpu_intensive_task(n: int):
# Факториал (CPU-bound)
result = 1
for i in range(1, n + 1):
result *= i
return result
@app.post("/calculate")
async def calculate(n: int, background_tasks: BackgroundTasks):
# Запускаем в фоновом потоке
background_tasks.add_task(cpu_intensive_task, n)
return {"status": "Calculation started"}
# Или с ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
future = executor.submit(cpu_intensive_task, 100)
result = future.result() # Ждёт результата
5. Multiprocessing — для истинного параллелизма
from multiprocessing import Process, Pool
# CPU-intensive функция
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# Параллельно
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(fibonacci, [30, 31, 32, 33])
print(results)
Матрица выбора
┌─────────────────────────┬─────────────────┬────────────────┐
│ Тип операции │ Длительность │ Рекомендуется │
├─────────────────────────┼─────────────────┼────────────────┤
│ HTTP запросы │ 100-5000ms │ async/await │
│ Database queries │ 10-500ms │ async/await │
│ File I/O │ 1-1000ms │ async/await │
│ Email/SMS отправка │ 100ms-30s │ Celery │
│ Report generation │ 10-60 сек │ Celery │
│ CSV processing │ 5-300 сек │ Celery │
│ Синтез видео │ 1-60 мин │ Celery + Beats │
│ CPU вычисления (факториал) │ любое │ multiprocessing│
└─────────────────────────┴─────────────────┴────────────────┘
Полный пример: Email рассылка
from fastapi import FastAPI
from celery import Celery
import asyncio
import httpx
app = FastAPI()
celery_app = Celery('tasks', broker='redis://localhost')
# Асинхронный запрос к API
async def get_user_emails(user_ids: list):
async with httpx.AsyncClient() as client:
tasks = [client.get(f"https://api.example.com/users/{uid}") for uid in user_ids]
responses = await asyncio.gather(*tasks)
return [r.json()['email'] for r in responses]
# Асинхронная задача — отправка email
@celery_app.task
def send_email(email: str, template: str):
# Имитируем SMTP
import smtplib
msg = f"Template: {template}"
# smtplib.SMTP().sendmail(...)
return f"Sent to {email}"
# Эндпоинт — собираем всё вместе
@app.post("/send-emails")
async def send_emails_endpoint(user_ids: list, template: str):
# 1. Асинхронно получаем emails
emails = await get_user_emails(user_ids)
# 2. Отправляем каждый email в фоне через Celery
tasks = []
for email in emails:
task = send_email.delay(email, template)
tasks.append(task.id)
return {
"total_emails": len(emails),
"task_ids": tasks,
"status": "Emails queued"
}
Ключевые выводы
- async/await — для I/O операций (сеть, БД, файлы), встроено в Python
- Celery — для heavy lifting (отправка emails, отчёты, видео), с Redis/RabbitMQ
- Background Tasks (FastAPI) — для простых фоновых задач
- ThreadPoolExecutor — для синхронного кода в параллели (ограничено GIL)
- multiprocessing — для CPU-bound операций (истинный параллелизм)
Выбирайте в зависимости от типа операции и требований к масштабированию.