← Назад к вопросам

Расскажите о своём опыте работы с асинхронным кодом

1.3 Junior🔥 151 комментариев
#Soft Skills#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Мой опыт работы с асинхронным кодом

Работаю с асинхронным кодом уже более 10 лет, и он стал неотъемлемой частью архитектуры моих систем. За это время я прошёл путь от простых потокочных решений к современным async/await паттернам.

Переход на асинхронность

В начале карьеры использовал традиционный синхронный код:

# 2010s — синхронный подход
import requests
import time

def fetch_data_sync(urls: list) -> list:
    """Загружает 100 URL последовательно"""
    results = []
    start = time.time()
    
    for url in urls:
        response = requests.get(url)  # БЛОКИРУЕТ на 1 секунду
        results.append(response.json())
    
    elapsed = time.time() - start
    print(f"Загруженo за {elapsed:.1f} сек")
    # Результат: 100 сек (100 URL × 1 сек)
    return results

fetch_data_sync([f"https://api.example.com/data/{i}" for i in range(100)])

Этот код был неэффективен. 100 URL загружались 100 секунд. Пока один запрос обрабатывается, процессор просто ждёт. Это потраты.

Threading — первое решение

Потом попробовал потоки:

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def fetch_data_threading(urls: list) -> list:
    """Загружает URL параллельно с помощью потоков"""
    start = time.time()
    results = []
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(requests.get, url) for url in urls]
        for future in futures:
            response = future.result()
            results.append(response.json())
    
    elapsed = time.time() - start
    print(f"Загруженo за {elapsed:.1f} сек")
    # Результат: ~10 сек (100 URL / 10 потоков × 1 сек)
    return results

Это было лучше (10 сек вместо 100), но потоки имели проблемы:

  • Context switching overhead
  • Race conditions
  • GIL (Global Interpreter Lock) в Python блокирует истинный параллелизм
  • Сложная отладка

Asyncio — современный подход

Затем перешёл на asyncio — это революция:

import asyncio
import aiohttp
import time
from typing import List

async def fetch_single(session: aiohttp.ClientSession, url: str) -> dict:
    """Загружает один URL асинхронно"""
    async with session.get(url) as response:
        return await response.json()

async def fetch_data_async(urls: List[str]) -> List[dict]:
    """Загружает все URL одновременно"""
    start = time.time()
    
    async with aiohttp.ClientSession() as session:
        # Создаём все корутины
        tasks = [fetch_single(session, url) for url in urls]
        # Выполняем их одновременно (не последовательно!)
        results = await asyncio.gather(*tasks)
    
    elapsed = time.time() - start
    print(f"Загруженo за {elapsed:.1f} сек")
    # Результат: ~1 сек (один раунд, т.к. все запросы идут параллельно)
    return results

# Запуск
if __name__ == "__main__":
    urls = [f"https://api.example.com/data/{i}" for i in range(100)]
    asyncio.run(fetch_data_async(urls))

Это была игра-менеджер. Все 100 запросов выполняются одновременно в одном потоке. Результат: 1 сек вместо 100!

Архитектурные паттерны

1. Event Loop — сердце асинхронности

import asyncio

async def main():
    print("Начало")
    await asyncio.sleep(1)  # Отдаём контроль event loop
    print("Конец")

# Event loop управляет выполнением
asyncio.run(main())  # Создаёт event loop, выполняет main, закрывает loop

2. Async/Await синтаксис

async def process_order(order_id: int) -> dict:
    """Async функция — может использовать await"""
    # Эти операции выполняются асинхронно
    customer = await fetch_customer(order_id)
    items = await fetch_items(order_id)
    payment = await process_payment(customer, items)
    
    return {
        "customer": customer,
        "items": items,
        "payment": payment
    }

# Вызов async функции — всегда возвращает корутину
coroutine = process_order(123)
await coroutine  # Нужен await внутри async функции

3. Concurrent.futures для блокирующих операций

import asyncio
from concurrent.futures import ThreadPoolExecutor
import cpu_intensive_lib

async def async_with_blocking():
    loop = asyncio.get_event_loop()
    
    # Блокирующая операция (например, тяжёлый вычисления)
    result = await loop.run_in_executor(
        ThreadPoolExecutor(max_workers=1),
        cpu_intensive_lib.calculate,
        1000000
    )
    
    print(f"Результат: {result}")
    return result

await async_with_blocking()

Реальные проекты

1. Telegram Bot (aiogram)

Мой бот обрабатывает тысячи сообщений одновременно:

from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
import asyncio

bot = Bot(token="YOUR_TOKEN")
dp = Dispatcher()

@dp.message(Command("start"))
async def start(message: types.Message):
    """Асинхронный обработчик"""
    user_data = await fetch_user_data(message.from_user.id)  # Асинхронная БД
    await message.answer(f"Привет, {user_data['name']}!")  # Асинхронная отправка

@dp.message()
async def process_message(message: types.Message):
    # Все обработчики выполняются асинхронно
    # 1000 пользователей одновременно — без проблем
    response = await ai_service.generate_response(message.text)
    await message.answer(response)

async def main():
    await dp.start_polling(bot)

asyncio.run(main())

2. FastAPI микросервис

from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio

app = FastAPI()

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession):
    """Асинхронный запрос в БД"""
    user = await db.execute(
        select(User).where(User.id == user_id)
    )
    return user.scalar()

@app.post("/orders")
async def create_order(order_data: OrderSchema, background_tasks: BackgroundTasks):
    """Создаём заказ и отправляем письмо в фоне"""
    order = await save_order(order_data)
    
    # Асинхронная фоновая задача
    background_tasks.add_task(send_email, order.customer_email)
    
    return order

async def send_email(email: str):
    """Отправляем письмо асинхронно"""
    async with aiohttp.ClientSession() as session:
        await session.post(
            "https://mail-service.api/send",
            json={"to": email, "subject": "Order created"}
        )

3. Обработка очереди задач

import asyncio
from typing import Coroutine

class TaskQueue:
    def __init__(self, max_workers: int = 10):
        self.semaphore = asyncio.Semaphore(max_workers)
        self.tasks = []
    
    async def add_task(self, coro: Coroutine):
        """Добавляет задачу с ограничением одновременных выполнений"""
        async with self.semaphore:
            return await coro
    
    async def process_all(self):
        """Выполняет все задачи"""
        return await asyncio.gather(*self.tasks)

# Использование
queue = TaskQueue(max_workers=5)

for item_id in range(1000):
    coro = fetch_and_process_item(item_id)
    queue.tasks.append(queue.add_task(coro))

results = await queue.process_all()

Частые ошибки и решения

Ошибка 1: Забывают await

# НЕПРАВИЛЬНО
async def broken():
    result = fetch_data()  # Забыли await!
    print(result)  # <coroutine object> вместо данных

# ПРАВИЛЬНО
async def fixed():
    result = await fetch_data()  # Добавили await
    print(result)  # Реальные данные

Ошибка 2: Блокирующие операции в asyncio

# НЕПРАВИЛЬНО
async def blocking():
    time.sleep(5)  # БЛОКИРУЕТ весь event loop!
    return "done"

# ПРАВИЛЬНО
async def non_blocking():
    await asyncio.sleep(5)  # Не блокирует
    return "done"

Ошибка 3: Синхронный IO в asyncio

# НЕПРАВИЛЬНО
async def bad_io():
    data = requests.get("https://api.com")  # Синхронный запрос = deadlock
    return data

# ПРАВИЛЬНО
async def good_io():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.com") as resp:
            data = await resp.json()
    return data

Лучшие практики асинхронной разработки

1. Используй asyncio.gather для параллельных задач

# Все выполняются одновременно
results = await asyncio.gather(
    fetch_user(1),
    fetch_orders(1),
    fetch_notifications(1),
)

2. Используй asyncio.create_task для долгоживущих задач

async def background_worker():
    while True:
        await process_queue_item()
        await asyncio.sleep(1)

# Запускаем в фоне
asyncio.create_task(background_worker())

3. Правильное управление ресурсами

async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.json()
# Session автоматически закроется

4. Таймауты для операций

try:
    result = await asyncio.wait_for(
        fetch_data(),
        timeout=5.0  # 5 секунд максимум
    )
except asyncio.TimeoutError:
    print("Запрос слишком долго выполняется")

Производительность

Работал над проектом с 10000+ одновременных WebSocket соединений:

Подход                      Результат
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Синхронный (Flask)          ~100 соединений (OOM)
Потоки (ThreadPoolExecutor) ~1000 соединений (CPU 95%)
Асинхронный (FastAPI)       10000+ соединений (CPU 15%)

Асинхронность дала 100x улучшение производительности.

Когда НЕ использовать асинхронность

  1. CPU-bound операции — асинхронность не помогает

    • Используй multiprocessing вместо asyncio
    • Или numpy, numba для вычислений
  2. Простые синхронные приложения — оверкомплекс

    • Блог, простой CRUD API
    • Усложняет код без выгоды
  3. Несовместимые библиотеки — если 90% кода синхронный

    • requests (используй httpx)
    • Обычный SQLAlchemy (используй asyncio версию)

Вывод

Асинхронный код — это не волшебство, это правильное использование ресурсов. Вместо ожидания блокирующих операций мы отдаём контроль другим задачам. Это даёт:

  • 100x лучше масштабируемость (10k одновременных соединений вместо 100)
  • Лучше использование CPU (1 поток вместо тысяч)
  • Проще отладка (нет race conditions, как в потоках)

Асинхронность — это не просто фишка, это необходимость для высоконагруженных систем.