← Назад к вопросам

Где оптимальнее применять асинхронность в Python?

2.0 Middle🔥 171 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI28 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Где оптимально применять асинхронность в Python

Это практический вопрос, потому что неправильное использование асинхронности может замедлить приложение. Разберу стратегически, когда асинхронность ДЕЙСТВИТЕЛЬНО помогает.

Золотое правило: Async для I/O-bound операций

Асинхронность эффективна ТОЛЬКО когда:
- Программа часто ждёт (I/O)
- На время ожидания можно работать с другими задачами
- Одновременно много операций

1. Web серверы (ИДЕАЛЬНЫЙ случай)

Сценарий: Обслуживание тысяч HTTP запросов

# FastAPI (асинхронный фреймворк) - ПРАВИЛЬНО
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

@app.get("/api/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession):
    # Асинхронный запрос в БД (I/O)
    user = await db.get(User, user_id)
    return user

# Может обслуживать 10,000+ одновременных пользователей
# Потребление памяти: ~100 MB

Почему async работает:

Запрос 1: GET /users/1 → ждет БД (100ms)
Запрос 2: GET /users/2 → ждет БД (100ms)
Запрос 3: GET /users/3 → ждет БД (100ms)

БЕЗ async (последовательно):  1+2+3 = 300ms
С async (параллельно):        max(1,2,3) = 100ms
# Пример: 1000 запросов одновременно
import asyncio
import aiohttp
import time

# АСИНХРОННО (правильно)
async def fetch_all_async():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, f"https://api.example.com/user/{i}") for i in range(1000)]
        results = await asyncio.gather(*tasks)  # Все 1000 параллельно!
    return results

async def fetch_url(session, url):
    async with session.get(url) as resp:
        return await resp.json()

start = time.time()
await fetch_all_async()
print(f"Async: {time.time() - start:.2f}s")  # ~2s (все параллельно)

# СИНХРОННО (неправильно)
def fetch_all_sync():
    results = []
    for i in range(1000):
        resp = requests.get(f"https://api.example.com/user/{i}")  # Ждём каждый раз
        results.append(resp.json())
    return results

start = time.time()
fetch_all_sync()
print(f"Sync: {time.time() - start:.2f}s")  # ~200s (последовательно!)

Вывод: асинхронность в 100 раз быстрее для I/O-bound операций.

2. Микросервисная архитектура

Сценарий: Микросервис должен вызвать 3 других микросервиса и подождать результат

# НЕПРАВИЛЬНО (последовательно)
async def get_user_profile(user_id):
    # Вызов 1
    user = await userservice.get_user(user_id)  # 100ms
    # Вызов 2
    orders = await orderservice.get_orders(user_id)  # 100ms
    # Вызов 3
    reviews = await reviewservice.get_reviews(user_id)  # 100ms
    # Всего: 300ms
    return {user, orders, reviews}

# ПРАВИЛЬНО (параллельно)
async def get_user_profile(user_id):
    user, orders, reviews = await asyncio.gather(
        userservice.get_user(user_id),  # 100ms }
        orderservice.get_orders(user_id),  # 100ms } параллельно
        reviewservice.get_reviews(user_id)  # 100ms }
    )
    # Всего: ~100ms (вместо 300ms!)
    return {user, orders, reviews}

Результат: Сокращение времени ответа в 3 раза.

3. Обработка событий (Event-driven архитектура)

Сценарий: Telegram бот обслуживает 500K+ пользователей одновременно

# Без async: невозможно (нужно 500K потоков = 4 TB памяти)
# С async: легко (1 поток, 500K coroutines = 500 MB памяти)

from aiogram import Router, types
from aiogram.fsm.context import FSMContext

router = Router()

@router.message()
async def handle_message(message: types.Message, state: FSMContext):
    # Асинхронная обработка
    user = await db.get_user(message.from_user.id)  # I/O
    response = await llm.generate(message.text)      # I/O
    await message.answer(response)                    # I/O
    await db.save_message(message)                    # I/O

# Все 500K пользователей обслуживаются одновременно
# Благодаря async на одном сервере

4. Работа с внешними API (crawling, scraping)

Сценарий: Нужно скачать 1000 URL'ов со скоростью ~100 в секунду

# БЕЗ async (последовательно)
import requests

def download_all_sync():
    urls = [f"https://api.example.com/data/{i}" for i in range(1000)]
    results = []
    for url in urls:
        resp = requests.get(url)  # 100ms на запрос
        results.append(resp.json())
    return results

# Время: 1000 * 100ms = 100 seconds

# С ASYNC (параллельно)
import aiohttp
import asyncio

async def download_all_async():
    urls = [f"https://api.example.com/data/{i}" for i in range(1000)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

async def fetch_url(session, url):
    try:
        async with session.get(url) as resp:
            return await resp.json()
    except Exception as e:
        return None

# Время: ~10-15 seconds (параллельно, с rate limiting)

Результат: 10x faster! Тысячи запросов одновременно.

5. Database query батчинг

Сценарий: Обработка 10K строк данных с запросами в БД

# БЕЗ async (медленно)
for record in records:
    user = db.query(User).filter_by(id=record['user_id']).first()
    user.process()
    db.commit()
# Время: 10K * 10ms = 100 seconds

# С async (быстро)
async def process_records_async(records):
    tasks = [process_record(record) for record in records]
    await asyncio.gather(*tasks)

async def process_record(record):
    user = await db.get(User, record['user_id'])
    user.process()
    await db.commit()

# Время: ~2 seconds (все параллельно)

⚠️ НЕПРАВИЛЬНОЕ использование async (антипаттерны)

Ошибка 1: Async для CPU-bound операций

# НЕПРАВИЛЬНО (не даёт выигрыша)
async def calculate_heavy():
    # CPU-bound задача (вычисления)
    result = sum(i**2 for i in range(1000000))  # Чистое вычисление
    return result

async def main():
    # Запускаем 4 задачи
    tasks = [calculate_heavy() for _ in range(4)]
    results = await asyncio.gather(*tasks)
    # Всё равно ~X секунд (async НЕ помогает!)
    # Event loop не может переключиться, потому что нет I/O

# ПРАВИЛЬНО (использовать multiprocessing для CPU-bound)
from multiprocessing import Pool

def calculate_heavy():
    return sum(i**2 for i in range(1000000))

with Pool(4) as pool:
    results = pool.map(calculate_heavy, [None] * 4)
    # Быстро благодаря реальному параллелизму (4 CPU cores)

Ошибка 2: Синхронные функции в асинхронном коде

# НЕПРАВИЛЬНО (блокирует event loop)
async def bad_handler():
    time.sleep(5)  # БЛОКИРУЕТ весь event loop на 5 секунд!
    # Все остальные coroutines вынуждены ждать
    return "result"

# ПРАВИЛЬНО (асинхронные операции)
async def good_handler():
    await asyncio.sleep(5)  # Не блокирует, другие coroutines работают
    return "result"

# ПРАВИЛЬНО (если синхронная операция критична)
async def wrapper_handler():
    # Запустить синхронную функцию в отдельном потоке
    result = await asyncio.to_thread(blocking_function)
    return result

def blocking_function():
    time.sleep(5)  # Выполняется в отдельном потоке

Ошибка 3: Слишком много одновременных операций

# НЕПРАВИЛЬНО (перегрузка сервера)
async def bad_request_all():
    urls = [f"https://example.com/{i}" for i in range(100000)]
    # Пытаемся 100K запросов одновременно = crash!
    tasks = [fetch(url) for url in urls]
    await asyncio.gather(*tasks)  # Ошибка: слишком много соединений

# ПРАВИЛЬНО (semaphore для ограничения)
semaphore = asyncio.Semaphore(100)  # Максимум 100 одновременно

async def good_request_all():
    urls = [f"https://example.com/{i}" for i in range(100000)]
    tasks = [fetch_with_limit(url, semaphore) for url in urls]
    await asyncio.gather(*tasks)

async def fetch_with_limit(url, sem):
    async with sem:
        return await fetch(url)

# Безопасно обрабатываем 100K, но по 100 одновременно

Сравнительная таблица

┌──────────────────────────┬────────────┬─────────────────────────────┐
│ Сценарий                 │ Async?     │ Почему                      │
├──────────────────────────┼────────────┼─────────────────────────────┤
│ Web сервер (многие users)│ ДА!!!      │ Много одновременных I/O     │
│ Обработка API запросов   │ ДА!!!      │ HTTP запросы = I/O           │
│ БД запросы (many users)  │ ДА!!!      │ Ждём результата БД          │
│ Telegram бот (500K users)│ ДА!!!      │ События, I/O to Telegram API│
│ Data processing batch    │ НЕТ        │ CPU-bound, нет I/O          │
│ ML model inference       │ НЕТ        │ CPU-bound вычисления        │
│ Image processing         │ НЕТ        │ CPU-bound операции          │
│ Event loop cleanup       │ ДА         │ I/O к основной БД           │
│ Rate-limited crawler     │ ДА!!!      │ 1000+ одновременных запросов│
│ CSV парсинг файла        │ НЕТ        │ CPU-bound парсинг           │
└──────────────────────────┴────────────┴─────────────────────────────┘

Практический чеклист

Используй ASYNC если:

  • ✅ Много одновременных соединений (web, API, bots)
  • ✅ Часто ждёшь I/O (БД, HTTP, file read)
  • ✅ Нужна высокая пропускная способность
  • ✅ Критична задержка (каждый запрос отдельный user)
  • ✅ Внешние сервисы/API часто дают задержку

НЕ используй ASYNC если:

  • ❌ Чистые CPU-bound вычисления (используй multiprocessing)
  • ❌ Код уже синхронный и переписание сложно
  • ❌ Нужна максимальная простота (sync проще для простых случаев)
  • ❌ Одна операция в раз, нет параллелизма
  • ❌ Производительность не критична

Выводы

Асинхронность в Python — это "ждун" для I/O операций:

  1. Web серверы: FastAPI + async = обслуживание 10K+ пользователей
  2. API интеграция: asyncio.gather() = параллельные запросы
  3. Telegram боты: один поток, 500K coroutines
  4. Data crawling: 1000 URL за 10 секунд вместо 100
  5. Rate-limited работа: semaphore = контроль нагрузки

Не используй async для CPU-bound задач — это антипаттерн, который может даже замедлить приложение.

Правило большого пальца:

Если ваш код часто вызывает await, async вам поможет. Если нет — async только усложнит разработку.

Где оптимальнее применять асинхронность в Python? | PrepBro