← Назад к вопросам

Что использовал для работы с асинхронностью?

2.0 Middle🔥 191 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Асинхронность в Python: мой опыт

Краткий ответ

За 10+ лет использовал разные подходы: от Celery для задач до asyncio для web приложений. Выбор инструмента зависит от задачи — нет один-для-всех решения.

1. Celery (асинхронные задачи)

Основной инструмент для фоновых задач:

from celery import Celery, shared_task
from django.core.mail import send_mail
import time

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')

# Простая задача
@shared_task
def send_email_task(email, subject, message):
    send_mail(
        subject,
        message,
        'from@example.com',
        [email],
        fail_silently=False,
    )
    return f"Email sent to {email}"

# Долгая задача
@shared_task(bind=True)
def process_file(self, file_id):
    """Процесс с прогрессом"""
    try:
        for i in range(100):
            # Обновляем прогресс
            self.update_state(
                state='PROGRESS',
                meta={'current': i, 'total': 100}
            )
            time.sleep(1)
        return 'Processing complete'
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)  # Retry через 60 сек

# Использование
from .tasks import send_email_task, process_file

# Запустить асинхронно
send_email_task.delay(email, subject, message)

# Запустить и получить result
result = process_file.apply_async(args=[file_id])
print(result.id)  # abc123...

# Проверить статус
from celery.result import AsyncResult
result = AsyncResult('abc123...')
print(result.state)  # 'PROGRESS'
print(result.result)  # {'current': 45, 'total': 100}

Что использовал Celery для:

  • Отправка писем (не блокировать HTTP request)
  • Генерация отчётов (долгие операции)
  • Синхронизация с внешними API
  • Обработка изображений/видео
  • Batch операции

Конфигурация:

# settings/production.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

# Периодические задачи
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'send-daily-report': {
        'task': 'myapp.tasks.send_daily_report',
        'schedule': crontab(hour=9, minute=0),  # Каждый день в 9:00
    },
    'cleanup-cache': {
        'task': 'myapp.tasks.cleanup_cache',
        'schedule': crontab(hour='*/2'),  # Каждые 2 часа
    },
}

2. asyncio (современный подход)

Для высоконагруженных веб-приложений:

import asyncio
import aiohttp
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

app = FastAPI()

# Асинхронное подключение к БД
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/dbname",
    echo=True,
)

# Простая асинхронная функция
async def fetch_user_from_api(user_id: int) -> dict:
    """Асинхронно получить пользователя из внешнего API"""
    async with aiohttp.ClientSession() as session:
        async with session.get(f'https://api.example.com/users/{user_id}') as resp:
            return await resp.json()

# Параллельные запросы
async def fetch_multiple_users(user_ids: list[int]) -> list[dict]:
    """Получить несколько пользователей параллельно"""
    tasks = [fetch_user_from_api(uid) for uid in user_ids]
    return await asyncio.gather(*tasks)

# FastAPI endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await fetch_user_from_api(user_id)
    return user

# Кэширование с асинхронностью
from functools import lru_cache

class AsyncCache:
    def __init__(self):
        self.cache = {}
    
    async def get_or_fetch(self, key: str, fetch_func):
        if key in self.cache:
            return self.cache[key]
        
        value = await fetch_func()
        self.cache[key] = value
        return value

cache = AsyncCache()
result = await cache.get_or_fetch(
    'user_123',
    lambda: fetch_user_from_api(123)
)

asyncio patterns:

# 1. Timeout для долгих операций
try:
    result = await asyncio.wait_for(
        fetch_data(),
        timeout=5.0
    )
except asyncio.TimeoutError:
    print("Request timeout")

# 2. Retry логика
async def fetch_with_retry(url: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.json()
        except aiohttp.ClientError as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

# 3. Rate limiting
from aiolimiter import AsyncLimiter

limiter = AsyncLimiter(max_rate=10, time_period=1)  # 10 req/sec

async def rate_limited_request(url: str):
    async with limiter:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.json()

3. Threads и Multiprocessing

Для CPU-bound операций:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Pool
import time

# CPU-bound функция
def heavy_computation(n: int) -> int:
    """Долгая вычислительная операция"""
    total = 0
    for i in range(n * 10_000_000):
        total += i
    return total

# ThreadPoolExecutor (для I/O-bound)
def io_bound_example():
    with ThreadPoolExecutor(max_workers=10) as executor:
        # Параллельно загрузить 10 файлов
        futures = [
            executor.submit(download_file, url)
            for url in urls
        ]
        results = [f.result() for f in futures]
    return results

# ProcessPoolExecutor (для CPU-bound)
def cpu_bound_example():
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Использует несколько CPU cores
        results = list(executor.map(heavy_computation, [100, 200, 300]))
    return results

# Multiprocessing для самых тяжёлых задач
def parallel_processing():
    with Pool(processes=4) as pool:
        # map сохраняет порядок результатов
        results = pool.map(heavy_computation, [100, 200, 300, 400])
    return results

Когда использовать:

use_case = {
    "Threads": {
        "Когда": "I/O операции (HTTP, DB, файлы)",
        "Пример": "Одновременно скачать 100 файлов",
        "Плюсы": "Просто, малая память",
        "Минусы": "GIL блокирует CPU",
    },
    "asyncio": {
        "Когда": "Много I/O операций, web framework",
        "Пример": "Обработать 10k одновременных connections",
        "Плюсы": "Эффективнее, чем threads",
        "Минусы": "Сложнее писать, нужна支持 из libs",
    },
    "Multiprocessing": {
        "Когда": "CPU-bound операции",
        "Пример": "Обработать изображения в параллель",
        "Плюсы": "Использует все CPU cores",
        "Минусы": "Оверхед на создание процессов, сложнее debug",
    },
    "Celery": {
        "Когда": "Распределённые долгие задачи",
        "Пример": "Отправить 1000 писем",
        "Плюсы": "Масштабируется горизонтально",
        "Минусы": "Нужен broker (Redis, RabbitMQ)",
    },
}

4. Гибридный подход

Реальный пример из production:

from fastapi import FastAPI, BackgroundTasks
from celery import shared_task
import asyncio

app = FastAPI()

@shared_task
def send_email_async(email: str, subject: str, body: str):
    """Celery для долгих операций"""
    # Отправить письмо
    send_email(email, subject, body)

@app.post("/user/register")
async def register_user(
    email: str,
    name: str,
    background_tasks: BackgroundTasks
):
    """API endpoint с асинхронной обработкой"""
    # 1. Быстро сохраняем в БД (asyncio)
    user = await db.create_user(email, name)
    
    # 2. Запускаем долгие операции через Celery
    send_email_async.delay(email, "Welcome", f"Hi {name}!")
    
    # 3. Возвращаем ответ сразу (не ждём email)
    return {"user_id": user.id, "status": "registered"}

# Внутри Celery задачи можем использовать asyncio
@shared_task
def sync_data_with_external_api(user_id: int):
    """Синхронизировать данные с внешним API"""
    # Используем asyncio внутри sync контекста
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(
        fetch_from_api(user_id)
    )
    return result

5. Инструменты для мониторинга

# Flower для мониторинга Celery
# celery -A myapp worker --loglevel=info
# celery -A myapp events
# flower --port=5555

# prometheus_client для метрик
from prometheus_client import Counter, Histogram, start_http_server

task_counter = Counter(
    'celery_tasks_total',
    'Total tasks',
    ['task_name', 'status']
)

task_duration = Histogram(
    'celery_task_duration_seconds',
    'Task duration',
    ['task_name']
)

@shared_task
def my_task():
    with task_duration.labels(task_name='my_task').time():
        # Выполняем задачу
        pass
    task_counter.labels(task_name='my_task', status='success').inc()

6. Основные выводы

best_practices = {
    "Выбор инструмента": [
        "I/O операции → asyncio (web) или Threads",
        "Долгие задачи → Celery + Redis/RabbitMQ",
        "CPU операции → Multiprocessing",
        "Периодические задачи → Celery Beat",
    ],
    "Gotchas": [
        "GIL блокирует многопоточность в CPU ops",
        "asyncio требует async-aware libraries",
        "Celery требует running worker процесса",
        "Deadlocks возможны с неправильным sync/async",
    ],
    "Optimization": [
        "Connection pooling (asyncpg, psycopg2-pool)",
        "Rate limiting для API calls",
        "Retry с exponential backoff",
        "Timeout для всех network requests",
    ],
}

Вывод: Асинхронность в Python это не просто инструмент — это архитектурный выбор. Я выбираю в зависимости от задачи: asyncio для modern веб-приложений, Celery для распределённых систем, threading для простых параллельных операций. Комбинирование подходов даёт лучший результат.

Что использовал для работы с асинхронностью? | PrepBro