← Назад к вопросам

Как правильно проектировать код с использованием асинхронных операций?

2.7 Senior🔥 191 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Принципы проектирования асинхронного кода

Проектирование асинхронных операций — критическая часть современной разработки, особенно при работе с I/O операциями, сетевыми запросами и обработкой больших объёмов данных. Вот ключевые принципы:

1. Выбор правильного инструмента

Python предоставляет несколько подходов:

asyncio — стандартный для асинхронного программирования threading — для блокирующих операций (deprecated в некоторых случаях) multiprocessing — для CPU-bound задач

import asyncio
from typing import Coroutine

# asyncio — отличный выбор для I/O-bound операций
async def fetch_data(url: str) -> str:
    # Имитация сетевого запроса
    await asyncio.sleep(1)
    return f"Data from {url}"

async def main():
    # Запуск нескольких задач одновременно
    results = await asyncio.gather(
        fetch_data("https://api1.com"),
        fetch_data("https://api2.com"),
        fetch_data("https://api3.com")
    )
    print(results)

asyncio.run(main())

2. Структурированная конкурентность (Structured Concurrency)

Это ключевой принцип: все асинхронные операции должны иметь чёткое начало и конец. TaskGroups помогают управлять жизненным циклом:

import asyncio
from contextlib import asynccontextmanager

async def process_with_timeout():
    try:
        async with asyncio.timeout(5):
            async with asyncio.TaskGroup() as tg:
                task1 = tg.create_task(long_operation())
                task2 = tg.create_task(another_operation())
    except asyncio.TimeoutError:
        print("Операция превысила timeout")
    except* ExceptionGroup as eg:
        print(f"Ошибки: {eg}")

3. Обработка ошибок

При работе с несколькими асинхронными задачами критически важно правильно обрабатывать ошибки:

async def robust_fetch(url: str) -> str | None:
    try:
        async with asyncio.timeout(10):
            # Вызов реального API
            return await fetch_data(url)
    except asyncio.TimeoutError:
        print(f"Timeout для {url}")
        return None
    except Exception as e:
        print(f"Ошибка при запросе {url}: {e}")
        return None

async def main():
    results = await asyncio.gather(
        robust_fetch("https://api1.com"),
        robust_fetch("https://api2.com"),
        robust_fetch("https://api3.com"),
        return_exceptions=True
    )

4. Правильное использование контекстных менеджеров

Для управления ресурсами (соединения, файлы) используй async контекстные менеджеры:

import aiohttp

async def fetch_with_session():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as resp:
            return await resp.json()

async def main():
    # Сессия автоматически закроется
    data = await fetch_with_session()

5. Избегание блокирующих операций

НЕ используй блокирующие I/O операции внутри async функций:

# ❌ НЕПРАВИЛЬНО — блокирует весь event loop
async def bad_example():
    time.sleep(1)  # ПЛОХО!
    return "done"

# ✅ ПРАВИЛЬНО — асинхронное ожидание
async def good_example():
    await asyncio.sleep(1)  # ХОРОШО!
    return "done"

Для блокирующих операций (файлы, CPU) используй to_thread:

async def process_file():
    # Запуск блокирующей функции в отдельном потоке
    result = await asyncio.to_thread(blocking_function)
    return result

6. Масштабируемость и производительность

Асинхронный код может обрабатывать тысячи соединений одновременно, но нужно учитывать:

  • Лимиты соединений: не открывай бесконечное количество одновременных запросов
  • Backpressure: контролируй скорость обработки данных
  • Connection pooling: переиспользуй соединения
async def controlled_concurrency():
    # Семафор ограничивает количество одновременных задач
    semaphore = asyncio.Semaphore(10)
    
    async def bounded_task(url: str):
        async with semaphore:
            return await fetch_data(url)
    
    urls = [f"https://api.com/{i}" for i in range(1000)]
    tasks = [bounded_task(url) for url in urls]
    
    results = await asyncio.gather(*tasks)
    return results

7. Тестирование асинхронного кода

Для тестирования используй pytest-asyncio:

import pytest

@pytest.mark.asyncio
async def test_async_function():
    result = await some_async_function()
    assert result == expected_value

Итоговые рекомендации

  1. Используй asyncio для I/O-bound операций
  2. Структурируй конкурентность с помощью TaskGroups
  3. Обрабатывай ошибки систематически
  4. Управляй ресурсами через контекстные менеджеры
  5. Избегай блокировок в async функциях
  6. Контролируй параллелизм с помощью семафоров
  7. Тестируй асинхронный код с правильными инструментами

Правильное проектирование асинхронного кода — это основа для масштабируемых и отзывчивых приложений.

Как правильно проектировать код с использованием асинхронных операций? | PrepBro