← Назад к вопросам

Как работает асинхронность на верхних уровнях?

1.8 Middle🔥 171 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Асинхронность на верхних уровнях

Асинхронность на верхних уровнях (application и presentation слои) отличается от низкоуровневого I/O. Здесь речь идёт об управлении потоками выполнения, конкурентностью и координацией между компонентами.

Уровни асинхронности в архитектуре

1. I/O уровень (низший)

  • asyncio, aiohttp, aiopg
  • Неблокирующие запросы к БД
  • Параллельные HTTP запросы

2. Application уровень (средний)

  • Управление бизнес-логикой
  • Координация операций
  • Use Cases и Service Layer

3. Presentation уровень (верхний)

  • API endpoints (FastAPI, Django)
  • Telegram Bot handlers (Aiogram)
  • WebSocket connections

Асинхронность в Application Layer

Пример: Регистрация пользователя

# domain/user.py
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID

@dataclass
class User:
    id: UUID
    email: str
    username: str
    created_at: datetime

# application/use_cases/register_user.py
from abc import ABC, abstractmethod

class UserRepository(ABC):
    @abstractmethod
    async def find_by_email(self, email: str) -> User | None:
        pass
    
    @abstractmethod
    async def save(self, user: User) -> None:
        pass

class RegisterUserUseCase:
    def __init__(self, repo: UserRepository, email_service):
        self.repo = repo
        self.email_service = email_service
    
    async def execute(self, email: str, username: str) -> User:
        # Проверяем, не существует ли уже такой пользователь
        existing_user = await self.repo.find_by_email(email)
        if existing_user:
            raise UserAlreadyExistsError(email)
        
        # Создаём нового пользователя
        user = User(
            id=uuid4(),
            email=email,
            username=username,
            created_at=datetime.now(UTC)
        )
        
        # Сохраняем в БД
        await self.repo.save(user)
        
        # Отправляем email параллельно (не ждём)
        # Но сначала сохраняем, потом отправляем
        await self.email_service.send_welcome_email(user)
        
        return user

Асинхронность в Presentation Layer

FastAPI endpoint

# presentation/api/users.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, EmailStr

router = APIRouter(prefix="/users")

class RegisterRequest(BaseModel):
    email: EmailStr
    username: str

class UserResponse(BaseModel):
    id: str
    email: str
    username: str

@router.post("/register", response_model=UserResponse)
async def register_user(
    request: RegisterRequest,
    use_case: RegisterUserUseCase
) -> UserResponse:
    try:
        user = await use_case.execute(
            email=request.email,
            username=request.username
        )
        return UserResponse(
            id=str(user.id),
            email=user.email,
            username=user.username
        )
    except UserAlreadyExistsError as e:
        raise HTTPException(status_code=409, detail=str(e))

Параллельная обработка нескольких операций

# Когда нужно выполнить несколько операций параллельно
import asyncio

class GetUserDashboardUseCase:
    def __init__(self, user_repo, post_repo, stats_service):
        self.user_repo = user_repo
        self.post_repo = post_repo
        self.stats_service = stats_service
    
    async def execute(self, user_id: UUID):
        # Запускаем все операции параллельно
        user, posts, stats = await asyncio.gather(
            self.user_repo.find_by_id(user_id),
            self.post_repo.find_by_user_id(user_id),
            self.stats_service.get_user_stats(user_id)
        )
        
        if not user:
            raise UserNotFoundError(user_id)
        
        return {
            "user": user,
            "posts": posts,
            "stats": stats
        }

Обработка ошибок в асинхронном коде

from typing import Optional

class SafeUserService:
    async def get_user_safe(self, user_id: UUID) -> Optional[User]:
        try:
            return await self.repo.find_by_id(user_id)
        except DatabaseError as e:
            logger.error(f"DB error: {e}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

# Или с использованием asyncio.gather
async def fetch_multiple_users(user_ids: list[UUID]):
    tasks = [self.repo.find_by_id(uid) for uid in user_ids]
    
    # Если один завалится, остальные продолжат
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Фильтруем ошибки
    users = [r for r in results if isinstance(r, User)]
    errors = [r for r in results if isinstance(r, Exception)]
    
    if errors:
        logger.warning(f"Failed to fetch {len(errors)} users")
    
    return users

Telegram Bot с асинхронностью (Aiogram)

# presentation/bot/handlers.py
from aiogram import Router, F
from aiogram.types import Message

router = Router()

class UserHandler:
    def __init__(self, register_use_case: RegisterUserUseCase):
        self.register_use_case = register_use_case
    
    @router.message(Command("register"))
    async def start_registration(self, message: Message, state: FSMContext):
        # Асинхронный вызов use case
        await state.set_state(RegistrationStates.waiting_email)
        await message.answer("Введите email:")
    
    @router.message(RegistrationStates.waiting_email)
    async def handle_email(self, message: Message, state: FSMContext):
        email = message.text
        
        # Проверяем почту параллельно с отправкой сообщения
        check_task = self.email_service.verify_email(email)
        message_task = message.answer("Проверяю email...")
        
        # Ждём обе операции
        is_valid, msg = await asyncio.gather(check_task, message_task)
        
        if is_valid:
            await state.update_data(email=email)
            await state.set_state(RegistrationStates.waiting_password)
            await message.answer("Email подтверждён. Введите пароль:")
        else:
            await message.answer("Email невалиден. Попробуйте ещё раз.")

Управление timeouts

import asyncio

class UserService:
    async def get_user_with_timeout(self, user_id: UUID, timeout: int = 5):
        try:
            return await asyncio.wait_for(
                self.repo.find_by_id(user_id),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            logger.error(f"Timeout fetching user {user_id}")
            raise UserFetchTimeoutError(user_id)

Очереди задач (Task Queue)

Для долгоживущих операций используй очереди вместо простого await:

# Для email, анализа больших данных и т.д.
from celery import Celery

app = Celery('tasks')

@app.task
def send_welcome_email(user_id: str, email: str):
    # Долгая операция
    email_service.send(email, "Welcome!")

# В use case
class RegisterUserUseCase:
    async def execute(self, email: str, username: str):
        user = User(...)
        await self.repo.save(user)
        
        # Отправляем в фоновую очередь, не ждём
        send_welcome_email.delay(str(user.id), user.email)
        
        return user

Лучшие практики

  • Не смешивай sync и async — выбери один подход
  • Параллели для независимых операций — используй asyncio.gather
  • Очереди для долгих задач — Celery, RabbitMQ, Redis
  • Ограничивай одновременные запросы — используй семафоры
  • Логируй асинхронные операции — сложнее отследить
  • Тестируй асинхронный код — используй pytest-asyncio
  • Используй context managers — async with для ресурсов

Асинхронность на верхних уровнях — это про координацию операций, управление параллелизмом и оптимизацию пропускной способности приложения.

Как работает асинхронность на верхних уровнях? | PrepBro