← Назад к вопросам

Приведи пример использования асинхронности

2.0 Middle🔥 231 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Приведи пример использования асинхронности

Асинхронность в Python позволяет выполнять множество операций одновременно в одном потоке. Вот практические примеры.

Основной концепт

Асинхронность решает проблему блокирующих операций (I/O):

import time

# ❌ Синхронный код — блокирует весь поток
def fetch_user(user_id):
    # Имитация API запроса (3 секунды)
    time.sleep(3)
    return {"id": user_id, "name": "John"}

start = time.time()
for i in range(3):
    user = fetch_user(i)
    print(user)
print(f"Total: {time.time() - start}s")  # 9 секунд!
import asyncio

# ✅ Асинхронный код — не блокирует поток
async def fetch_user(user_id):
    # Имитация API запроса
    await asyncio.sleep(3)
    return {"id": user_id, "name": "John"}

async def main():
    start = time.time()
    # Запускаем 3 операции параллельно
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    print(users)
    print(f"Total: {time.time() - start}s")  # 3 секунды вместо 9!

asyncio.run(main())

Пример 1: Загрузка нескольких URL одновременно

import aiohttp
import asyncio

async def fetch_page(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_multiple_pages():
    urls = [
        "https://example.com",
        "https://google.com",
        "https://github.com"
    ]
    
    async with aiohttp.ClientSession() as session:
        # Загружаем все страницы одновременно
        tasks = [fetch_page(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, html in zip(urls, results):
            print(f"{url}: {len(html)} bytes")

asyncio.run(fetch_multiple_pages())

# Результат:
# https://example.com: 1234 bytes
# https://google.com: 5678 bytes
# https://github.com: 9012 bytes
# Время: ~2 сек вместо 6+ сек синхронно

Пример 2: Web API с FastAPI

from fastapi import FastAPI
import asyncio
import aiohttp

app = FastAPI()

# ❌ Синхронный эндпоинт (блокирует другие запросы)
@app.get("/user-slow/{user_id}")
def get_user_slow(user_id: int):
    # Ожидание ответа от другого API
    response = requests.get(f"https://api.example.com/user/{user_id}")
    time.sleep(2)  # Блокирует весь сервер!
    return response.json()

# ✅ Асинхронный эндпоинт (не блокирует)
@app.get("/user/{user_id}")
async def get_user(user_id: int):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/user/{user_id}") as response:
            await asyncio.sleep(2)
            return await response.json()

# С асинхронностью один сервер на 4GB RAM может обрабатывать:
# - Синхронно: 50-100 concurrent requests
# - Асинхронно: 10,000+ concurrent requests

Пример 3: Фоновые задачи в Celery

from celery import Celery
from celery import group, chain
import asyncio

app = Celery('myapp', broker='amqp://localhost')

# ❌ Синхронное выполнение
@app.task
def send_email(user_id):
    # Отправляет email (может быть медленно)
    time.sleep(5)
    return f"Email sent to user {user_id}"

def process_users_slow(user_ids):
    for user_id in user_ids:
        send_email(user_id)
    # 100 пользователей × 5 сек = 500 сек!

# ✅ Асинхронное выполнение
def process_users_fast(user_ids):
    # Запускаем все emails параллельно
    job = group(send_email.s(user_id) for user_id in user_ids)
    job.apply_async()
    # 100 пользователей обрабатываются за 5 сек

Пример 4: Database операции с async ORM

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
import asyncio

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_users():
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(User))
        users = result.scalars().all()
        return users

async def get_orders():
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(Order))
        orders = result.scalars().all()
        return orders

async def main():
    # Запускаем обе операции параллельно
    users, orders = await asyncio.gather(
        get_users(),
        get_orders()
    )
    print(f"Got {len(users)} users and {len(orders)} orders")

asyncio.run(main())

# Синхронно: время_пользователей + время_заказов
# Асинхронно: max(время_пользователей, время_заказов)

Пример 5: Real-time уведомления с WebSockets

from fastapi import FastAPI, WebSocket
import asyncio
import json

app = FastAPI()

active_connections = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    
    try:
        while True:
            # Слушаем сообщение от клиента
            data = await websocket.receive_text()
            
            # Отправляем всем клиентам (асинхронно!)
            for connection in active_connections:
                await connection.send_text(f"New message: {data}")
    finally:
        active_connections.remove(websocket)

# Без асинхронности каждый клиент блокировал бы сервер
# С асинхронностью 10,000 клиентов работают одновременно

Пример 6: Обработка очереди задач

import asyncio
import aioredis
from typing import Any

class TaskQueue:
    def __init__(self):
        self.redis = None
    
    async def connect(self):
        self.redis = await aioredis.create_redis_pool('redis://localhost')
    
    async def enqueue(self, task_name: str, data: Any):
        # Добавляем задачу в очередь
        await self.redis.rpush(f"queue:{task_name}", json.dumps(data))
    
    async def process_tasks(self, task_name: str, worker):
        while True:
            # Берём задачу из очереди
            task = await self.redis.lpop(f"queue:{task_name}")
            if not task:
                await asyncio.sleep(1)
                continue
            
            data = json.loads(task)
            # Обрабатываем асинхронно
            await worker(data)

# Использование
async def process_email(task):
    # Отправка email
    await asyncio.sleep(2)
    print(f"Email sent: {task['email']}")

async def main():
    queue = TaskQueue()
    await queue.connect()
    
    # Запускаем 10 workers параллельно
    workers = [
        queue.process_tasks('email', process_email)
        for _ in range(10)
    ]
    
    await asyncio.gather(*workers)

asyncio.run(main())

Пример 7: Timeout и Retry логика

import asyncio
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    AsyncRetrying
)

async def unstable_api_call():
    # API может упасть
    if random.random() < 0.5:
        raise ConnectionError("API failed")
    return {"status": "ok"}

# ✅ С retry и timeout
async def fetch_with_retry():
    async for attempt in AsyncRetrying(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    ):
        with attempt:
            result = await asyncio.wait_for(
                unstable_api_call(),
                timeout=5.0  # Максимум 5 секунд на попытку
            )
            return result

asyncio.run(fetch_with_retry())

Где использовать асинхронность

Используй async:

  • HTTP запросы к внешним API
  • Database операции
  • File I/O
  • WebSockets и real-time communication
  • Обработка очередей
  • Long-polling

Не используй async (используй threads/multiprocessing):

  • CPU-intensive операции (обработка изображений, криптография)
  • Работа с CPU cores
  • Блокирующие C-библиотеки

Вывод

Асинхронность в Python — это способ:

  • Обрабатывать много I/O операций одновременно
  • Не блокировать основной поток
  • Писать масштабируемые веб-приложения
  • Эффективно использовать ресурсы сервера

Основной паттерн: await + asyncio.gather() для параллельного выполнения операций.

Приведи пример использования асинхронности | PrepBro