← Назад к вопросам
Как работает асинхронность на верхних уровнях?
1.8 Middle🔥 171 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Асинхронность на верхних уровнях
Асинхронность на верхних уровнях (application и presentation слои) отличается от низкоуровневого I/O. Здесь речь идёт об управлении потоками выполнения, конкурентностью и координацией между компонентами.
Уровни асинхронности в архитектуре
1. I/O уровень (низший)
- asyncio, aiohttp, aiopg
- Неблокирующие запросы к БД
- Параллельные HTTP запросы
2. Application уровень (средний)
- Управление бизнес-логикой
- Координация операций
- Use Cases и Service Layer
3. Presentation уровень (верхний)
- API endpoints (FastAPI, Django)
- Telegram Bot handlers (Aiogram)
- WebSocket connections
Асинхронность в Application Layer
Пример: Регистрация пользователя
# domain/user.py
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID
@dataclass
class User:
id: UUID
email: str
username: str
created_at: datetime
# application/use_cases/register_user.py
from abc import ABC, abstractmethod
class UserRepository(ABC):
@abstractmethod
async def find_by_email(self, email: str) -> User | None:
pass
@abstractmethod
async def save(self, user: User) -> None:
pass
class RegisterUserUseCase:
def __init__(self, repo: UserRepository, email_service):
self.repo = repo
self.email_service = email_service
async def execute(self, email: str, username: str) -> User:
# Проверяем, не существует ли уже такой пользователь
existing_user = await self.repo.find_by_email(email)
if existing_user:
raise UserAlreadyExistsError(email)
# Создаём нового пользователя
user = User(
id=uuid4(),
email=email,
username=username,
created_at=datetime.now(UTC)
)
# Сохраняем в БД
await self.repo.save(user)
# Отправляем email параллельно (не ждём)
# Но сначала сохраняем, потом отправляем
await self.email_service.send_welcome_email(user)
return user
Асинхронность в Presentation Layer
FastAPI endpoint
# presentation/api/users.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, EmailStr
router = APIRouter(prefix="/users")
class RegisterRequest(BaseModel):
email: EmailStr
username: str
class UserResponse(BaseModel):
id: str
email: str
username: str
@router.post("/register", response_model=UserResponse)
async def register_user(
request: RegisterRequest,
use_case: RegisterUserUseCase
) -> UserResponse:
try:
user = await use_case.execute(
email=request.email,
username=request.username
)
return UserResponse(
id=str(user.id),
email=user.email,
username=user.username
)
except UserAlreadyExistsError as e:
raise HTTPException(status_code=409, detail=str(e))
Параллельная обработка нескольких операций
# Когда нужно выполнить несколько операций параллельно
import asyncio
class GetUserDashboardUseCase:
def __init__(self, user_repo, post_repo, stats_service):
self.user_repo = user_repo
self.post_repo = post_repo
self.stats_service = stats_service
async def execute(self, user_id: UUID):
# Запускаем все операции параллельно
user, posts, stats = await asyncio.gather(
self.user_repo.find_by_id(user_id),
self.post_repo.find_by_user_id(user_id),
self.stats_service.get_user_stats(user_id)
)
if not user:
raise UserNotFoundError(user_id)
return {
"user": user,
"posts": posts,
"stats": stats
}
Обработка ошибок в асинхронном коде
from typing import Optional
class SafeUserService:
async def get_user_safe(self, user_id: UUID) -> Optional[User]:
try:
return await self.repo.find_by_id(user_id)
except DatabaseError as e:
logger.error(f"DB error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
# Или с использованием asyncio.gather
async def fetch_multiple_users(user_ids: list[UUID]):
tasks = [self.repo.find_by_id(uid) for uid in user_ids]
# Если один завалится, остальные продолжат
results = await asyncio.gather(*tasks, return_exceptions=True)
# Фильтруем ошибки
users = [r for r in results if isinstance(r, User)]
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning(f"Failed to fetch {len(errors)} users")
return users
Telegram Bot с асинхронностью (Aiogram)
# presentation/bot/handlers.py
from aiogram import Router, F
from aiogram.types import Message
router = Router()
class UserHandler:
def __init__(self, register_use_case: RegisterUserUseCase):
self.register_use_case = register_use_case
@router.message(Command("register"))
async def start_registration(self, message: Message, state: FSMContext):
# Асинхронный вызов use case
await state.set_state(RegistrationStates.waiting_email)
await message.answer("Введите email:")
@router.message(RegistrationStates.waiting_email)
async def handle_email(self, message: Message, state: FSMContext):
email = message.text
# Проверяем почту параллельно с отправкой сообщения
check_task = self.email_service.verify_email(email)
message_task = message.answer("Проверяю email...")
# Ждём обе операции
is_valid, msg = await asyncio.gather(check_task, message_task)
if is_valid:
await state.update_data(email=email)
await state.set_state(RegistrationStates.waiting_password)
await message.answer("Email подтверждён. Введите пароль:")
else:
await message.answer("Email невалиден. Попробуйте ещё раз.")
Управление timeouts
import asyncio
class UserService:
async def get_user_with_timeout(self, user_id: UUID, timeout: int = 5):
try:
return await asyncio.wait_for(
self.repo.find_by_id(user_id),
timeout=timeout
)
except asyncio.TimeoutError:
logger.error(f"Timeout fetching user {user_id}")
raise UserFetchTimeoutError(user_id)
Очереди задач (Task Queue)
Для долгоживущих операций используй очереди вместо простого await:
# Для email, анализа больших данных и т.д.
from celery import Celery
app = Celery('tasks')
@app.task
def send_welcome_email(user_id: str, email: str):
# Долгая операция
email_service.send(email, "Welcome!")
# В use case
class RegisterUserUseCase:
async def execute(self, email: str, username: str):
user = User(...)
await self.repo.save(user)
# Отправляем в фоновую очередь, не ждём
send_welcome_email.delay(str(user.id), user.email)
return user
Лучшие практики
- Не смешивай sync и async — выбери один подход
- Параллели для независимых операций — используй asyncio.gather
- Очереди для долгих задач — Celery, RabbitMQ, Redis
- Ограничивай одновременные запросы — используй семафоры
- Логируй асинхронные операции — сложнее отследить
- Тестируй асинхронный код — используй pytest-asyncio
- Используй context managers — async with для ресурсов
Асинхронность на верхних уровнях — это про координацию операций, управление параллелизмом и оптимизацию пропускной способности приложения.