← Назад к вопросам

Для каких задач лучше использовать асинхронные операции

2.0 Middle🔥 191 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Для каких задач лучше использовать асинхронные операции

Асинхронные операции (async/await) в Python позволяют выполнять множество операций ввода-вывода одновременно на одном потоке. Это кардинально отличается от традиционного синхронного подхода, где программа ждёт завершения каждой операции.

Сравнение: синхронный vs асинхронный код

Синхронный (последовательный) подход:

import time
import requests

start = time.time()

# Загружаем 5 URL последовательно
for url in urls:
    response = requests.get(url)  # Ждём ответ
    process_data(response.text)
    
print(f"Время: {time.time() - start}s")  # ~15 секунд (если каждый запрос 3 сек)

Асинхронный подход:

import asyncio
import aiohttp
import time

async def fetch_all():
    async with aiohttp.ClientSession() as session:
        start = time.time()
        
        # Запускаем все запросы одновременно
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for data in results:
            process_data(data)
            
        print(f"Время: {time.time() - start}s")  # ~3 секунды

async def fetch_url(session, url):
    async with session.get(url) as resp:
        return await resp.text()

asyncio.run(fetch_all())

Основные сценарии использования async/await

1. Веб-скрейпинг и парсинг множества сайтов

Если нужно загрузить 1000 веб-страниц, асинхронность даст 10-50x ускорение:

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def scrape_websites(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [scrape_single(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def scrape_single(session, url):
    async with session.get(url) as resp:
        html = await resp.text()
        soup = BeautifulSoup(html, 'html.parser')
        return soup.find_all('a')

2. Микросервисная архитектура (вызовы других API)

В микросервисах часто нужно вызвать 5-10 других сервисов параллельно:

import asyncio
import aiohttp

async def get_user_data(user_id):
    async with aiohttp.ClientSession() as session:
        # Получить данные из 4 сервисов параллельно
        user = await fetch(session, f'/api/users/{user_id}')
        orders = await fetch(session, f'/api/orders/{user_id}')
        profile = await fetch(session, f'/api/profile/{user_id}')
        stats = await fetch(session, f'/api/stats/{user_id}')
        
        return {user, orders, profile, stats}

async def fetch(session, endpoint):
    async with session.get(endpoint) as resp:
        return await resp.json()

3. WebSocket соединения (real-time приложения)

Для чатов, уведомлений, live данных:

import asyncio
import websockets
import json

async def handle_client(websocket, path):
    async for message in websocket:
        data = json.loads(message)
        # Обработка сообщения
        response = await process_message(data)
        await websocket.send(json.dumps(response))

async def main():
    async with websockets.serve(handle_client, '0.0.0.0', 8000):
        await asyncio.Future()  # Бесконечный цикл

asyncio.run(main())

4. Обработка множества одновременных соединений (web server)

При 10000 одновременных пользователей асинхронный сервер работает намного эффективнее:

from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/api/data")
async def get_data():
    # Асинхронный вызов БД
    data = await database.fetch("SELECT * FROM users")
    return JSONResponse(data)

@app.post("/api/process")
async def process(item: dict):
    # Несколько асинхронных операций параллельно
    result1 = await service1.process(item)
    result2 = await service2.process(item)
    return {result1, result2}

5. Работа с очередями задач (Task queue)

Обработка тысяч задач из RabbitMQ, Redis, Celery:

import asyncio
import aio_pika

async def process_tasks():
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    queue = await channel.get_queue('tasks')
    
    # Обрабатываем 100 задач параллельно
    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                await process_task(message.body)

asyncio.run(process_tasks())

6. Таймауты и отмена операций

Нужно отменить операцию если она длится дольше лимита:

import asyncio

async def fetch_with_timeout():
    try:
        # Отменить если дольше 5 секунд
        result = await asyncio.wait_for(
            long_running_operation(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        print("Операция превышена по времени")

async def cancel_operation():
    task = asyncio.create_task(long_operation())
    
    # Дождаться 2 секунды
    await asyncio.sleep(2)
    
    # Отменить задачу
    task.cancel()

7. Работа с базами данных (async ORM)

Асинхронные драйверы баз данных (asyncpg, motor, aiosqlite):

import asyncpg

async def get_users():
    # Асинхронное подключение к PostgreSQL
    conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
    
    # Параллельные запросы
    users = await conn.fetch('SELECT * FROM users')
    orders = await conn.fetch('SELECT * FROM orders')
    
    await conn.close()
    return {users, orders}

8. Периодические задачи (Scheduler)

Выполнение задач по расписанию:

import asyncio
from apscheduler.asyncio import AsyncIOScheduler

async def main():
    scheduler = AsyncIOScheduler()
    
    # Запускать каждые 5 минут
    scheduler.add_job(send_emails, 'interval', minutes=5)
    scheduler.add_job(cleanup_cache, 'interval', hours=1)
    
    scheduler.start()
    
    # Остаться в цикле
    while True:
        await asyncio.sleep(1)

async def send_emails():
    # Отправить 1000 писем параллельно
    tasks = [send_email(user) for user in users]
    await asyncio.gather(*tasks)

asyncio.run(main())

Когда НЕ использовать async

CPU-bound операции (вычисления):

# ❌ НЕПРАВИЛЬНО: async не поможет
async def calculate_heavy():
    result = sum(i**2 for i in range(100000000))
    return result

# ✅ ПРАВИЛЬНО: используй multiprocessing
from multiprocessing import Pool

def calculate_heavy():
    return sum(i**2 for i in range(100000000))

with Pool(4) as pool:
    results = pool.map(calculate_heavy, range(10))

Когда всё синхронное и нельзя использовать async driver:

# ❌ НЕПРАВИЛЬНО: requests не асинхронный
import asyncio
import requests

async def bad_example():
    for url in urls:
        response = requests.get(url)  # Это блокирует!
        process(response.text)

# ✅ ПРАВИЛЬНО: используй aiohttp
import aiohttp

async def good_example():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

Сравнительная таблица

СценарийСинхронныйАсинхронныйМногопоточныйМногопроцессный
I/O операции (HTTP)❌ 15s✅ 3s✅ 3s❌ медленно
CPU интенсивные✅ OK❌ No❌ GIL✅ OK
10000 соединений❌ fail✅ OK❌ OOM❌ fail
WebSocket❌ No✅ OK⚠️ сложно❌ No
Простой скрипт✅ OK❌ усложнение⚠️ overhead❌ overhead

Основной вывод

Используй async/await когда:

  • ✅ Много I/O операций (HTTP, БД, файлы)
  • ✅ Нужно обрабатывать множество соединений
  • ✅ Требуется low latency (real-time)
  • ✅ Нужны WebSocket соединения
  • ✅ Работа с микросервисной архитектурой

Не используй async если:

  • ❌ Задача CPU-bound (вычисления)
  • ❌ Нет асинхронных драйверов для библиотек
  • ❌ Команда не знакома с async паттернами
  • ❌ Простой скрипт без I/O операций
Для каких задач лучше использовать асинхронные операции | PrepBro