← Назад к вопросам
Что использовал для работы с асинхронностью?
2.0 Middle🔥 191 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Асинхронность в Python: мой опыт
Краткий ответ
За 10+ лет использовал разные подходы: от Celery для задач до asyncio для web приложений. Выбор инструмента зависит от задачи — нет один-для-всех решения.
1. Celery (асинхронные задачи)
Основной инструмент для фоновых задач:
from celery import Celery, shared_task
from django.core.mail import send_mail
import time
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
# Простая задача
@shared_task
def send_email_task(email, subject, message):
send_mail(
subject,
message,
'from@example.com',
[email],
fail_silently=False,
)
return f"Email sent to {email}"
# Долгая задача
@shared_task(bind=True)
def process_file(self, file_id):
"""Процесс с прогрессом"""
try:
for i in range(100):
# Обновляем прогресс
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': 100}
)
time.sleep(1)
return 'Processing complete'
except Exception as exc:
raise self.retry(exc=exc, countdown=60) # Retry через 60 сек
# Использование
from .tasks import send_email_task, process_file
# Запустить асинхронно
send_email_task.delay(email, subject, message)
# Запустить и получить result
result = process_file.apply_async(args=[file_id])
print(result.id) # abc123...
# Проверить статус
from celery.result import AsyncResult
result = AsyncResult('abc123...')
print(result.state) # 'PROGRESS'
print(result.result) # {'current': 45, 'total': 100}
Что использовал Celery для:
- Отправка писем (не блокировать HTTP request)
- Генерация отчётов (долгие операции)
- Синхронизация с внешними API
- Обработка изображений/видео
- Batch операции
Конфигурация:
# settings/production.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
# Периодические задачи
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00
},
'cleanup-cache': {
'task': 'myapp.tasks.cleanup_cache',
'schedule': crontab(hour='*/2'), # Каждые 2 часа
},
}
2. asyncio (современный подход)
Для высоконагруженных веб-приложений:
import asyncio
import aiohttp
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
app = FastAPI()
# Асинхронное подключение к БД
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
echo=True,
)
# Простая асинхронная функция
async def fetch_user_from_api(user_id: int) -> dict:
"""Асинхронно получить пользователя из внешнего API"""
async with aiohttp.ClientSession() as session:
async with session.get(f'https://api.example.com/users/{user_id}') as resp:
return await resp.json()
# Параллельные запросы
async def fetch_multiple_users(user_ids: list[int]) -> list[dict]:
"""Получить несколько пользователей параллельно"""
tasks = [fetch_user_from_api(uid) for uid in user_ids]
return await asyncio.gather(*tasks)
# FastAPI endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await fetch_user_from_api(user_id)
return user
# Кэширование с асинхронностью
from functools import lru_cache
class AsyncCache:
def __init__(self):
self.cache = {}
async def get_or_fetch(self, key: str, fetch_func):
if key in self.cache:
return self.cache[key]
value = await fetch_func()
self.cache[key] = value
return value
cache = AsyncCache()
result = await cache.get_or_fetch(
'user_123',
lambda: fetch_user_from_api(123)
)
asyncio patterns:
# 1. Timeout для долгих операций
try:
result = await asyncio.wait_for(
fetch_data(),
timeout=5.0
)
except asyncio.TimeoutError:
print("Request timeout")
# 2. Retry логика
async def fetch_with_retry(url: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
except aiohttp.ClientError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
# 3. Rate limiting
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 req/sec
async def rate_limited_request(url: str):
async with limiter:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
3. Threads и Multiprocessing
Для CPU-bound операций:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Pool
import time
# CPU-bound функция
def heavy_computation(n: int) -> int:
"""Долгая вычислительная операция"""
total = 0
for i in range(n * 10_000_000):
total += i
return total
# ThreadPoolExecutor (для I/O-bound)
def io_bound_example():
with ThreadPoolExecutor(max_workers=10) as executor:
# Параллельно загрузить 10 файлов
futures = [
executor.submit(download_file, url)
for url in urls
]
results = [f.result() for f in futures]
return results
# ProcessPoolExecutor (для CPU-bound)
def cpu_bound_example():
with ProcessPoolExecutor(max_workers=4) as executor:
# Использует несколько CPU cores
results = list(executor.map(heavy_computation, [100, 200, 300]))
return results
# Multiprocessing для самых тяжёлых задач
def parallel_processing():
with Pool(processes=4) as pool:
# map сохраняет порядок результатов
results = pool.map(heavy_computation, [100, 200, 300, 400])
return results
Когда использовать:
use_case = {
"Threads": {
"Когда": "I/O операции (HTTP, DB, файлы)",
"Пример": "Одновременно скачать 100 файлов",
"Плюсы": "Просто, малая память",
"Минусы": "GIL блокирует CPU",
},
"asyncio": {
"Когда": "Много I/O операций, web framework",
"Пример": "Обработать 10k одновременных connections",
"Плюсы": "Эффективнее, чем threads",
"Минусы": "Сложнее писать, нужна支持 из libs",
},
"Multiprocessing": {
"Когда": "CPU-bound операции",
"Пример": "Обработать изображения в параллель",
"Плюсы": "Использует все CPU cores",
"Минусы": "Оверхед на создание процессов, сложнее debug",
},
"Celery": {
"Когда": "Распределённые долгие задачи",
"Пример": "Отправить 1000 писем",
"Плюсы": "Масштабируется горизонтально",
"Минусы": "Нужен broker (Redis, RabbitMQ)",
},
}
4. Гибридный подход
Реальный пример из production:
from fastapi import FastAPI, BackgroundTasks
from celery import shared_task
import asyncio
app = FastAPI()
@shared_task
def send_email_async(email: str, subject: str, body: str):
"""Celery для долгих операций"""
# Отправить письмо
send_email(email, subject, body)
@app.post("/user/register")
async def register_user(
email: str,
name: str,
background_tasks: BackgroundTasks
):
"""API endpoint с асинхронной обработкой"""
# 1. Быстро сохраняем в БД (asyncio)
user = await db.create_user(email, name)
# 2. Запускаем долгие операции через Celery
send_email_async.delay(email, "Welcome", f"Hi {name}!")
# 3. Возвращаем ответ сразу (не ждём email)
return {"user_id": user.id, "status": "registered"}
# Внутри Celery задачи можем использовать asyncio
@shared_task
def sync_data_with_external_api(user_id: int):
"""Синхронизировать данные с внешним API"""
# Используем asyncio внутри sync контекста
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
fetch_from_api(user_id)
)
return result
5. Инструменты для мониторинга
# Flower для мониторинга Celery
# celery -A myapp worker --loglevel=info
# celery -A myapp events
# flower --port=5555
# prometheus_client для метрик
from prometheus_client import Counter, Histogram, start_http_server
task_counter = Counter(
'celery_tasks_total',
'Total tasks',
['task_name', 'status']
)
task_duration = Histogram(
'celery_task_duration_seconds',
'Task duration',
['task_name']
)
@shared_task
def my_task():
with task_duration.labels(task_name='my_task').time():
# Выполняем задачу
pass
task_counter.labels(task_name='my_task', status='success').inc()
6. Основные выводы
best_practices = {
"Выбор инструмента": [
"I/O операции → asyncio (web) или Threads",
"Долгие задачи → Celery + Redis/RabbitMQ",
"CPU операции → Multiprocessing",
"Периодические задачи → Celery Beat",
],
"Gotchas": [
"GIL блокирует многопоточность в CPU ops",
"asyncio требует async-aware libraries",
"Celery требует running worker процесса",
"Deadlocks возможны с неправильным sync/async",
],
"Optimization": [
"Connection pooling (asyncpg, psycopg2-pool)",
"Rate limiting для API calls",
"Retry с exponential backoff",
"Timeout для всех network requests",
],
}
Вывод: Асинхронность в Python это не просто инструмент — это архитектурный выбор. Я выбираю в зависимости от задачи: asyncio для modern веб-приложений, Celery для распределённых систем, threading для простых параллельных операций. Комбинирование подходов даёт лучший результат.