← Назад к вопросам
Приведи пример использования асинхронности
2.0 Middle🔥 231 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Приведи пример использования асинхронности
Асинхронность в Python позволяет выполнять множество операций одновременно в одном потоке. Вот практические примеры.
Основной концепт
Асинхронность решает проблему блокирующих операций (I/O):
import time
# ❌ Синхронный код — блокирует весь поток
def fetch_user(user_id):
# Имитация API запроса (3 секунды)
time.sleep(3)
return {"id": user_id, "name": "John"}
start = time.time()
for i in range(3):
user = fetch_user(i)
print(user)
print(f"Total: {time.time() - start}s") # 9 секунд!
import asyncio
# ✅ Асинхронный код — не блокирует поток
async def fetch_user(user_id):
# Имитация API запроса
await asyncio.sleep(3)
return {"id": user_id, "name": "John"}
async def main():
start = time.time()
# Запускаем 3 операции параллельно
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
)
print(users)
print(f"Total: {time.time() - start}s") # 3 секунды вместо 9!
asyncio.run(main())
Пример 1: Загрузка нескольких URL одновременно
import aiohttp
import asyncio
async def fetch_page(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_pages():
urls = [
"https://example.com",
"https://google.com",
"https://github.com"
]
async with aiohttp.ClientSession() as session:
# Загружаем все страницы одновременно
tasks = [fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, html in zip(urls, results):
print(f"{url}: {len(html)} bytes")
asyncio.run(fetch_multiple_pages())
# Результат:
# https://example.com: 1234 bytes
# https://google.com: 5678 bytes
# https://github.com: 9012 bytes
# Время: ~2 сек вместо 6+ сек синхронно
Пример 2: Web API с FastAPI
from fastapi import FastAPI
import asyncio
import aiohttp
app = FastAPI()
# ❌ Синхронный эндпоинт (блокирует другие запросы)
@app.get("/user-slow/{user_id}")
def get_user_slow(user_id: int):
# Ожидание ответа от другого API
response = requests.get(f"https://api.example.com/user/{user_id}")
time.sleep(2) # Блокирует весь сервер!
return response.json()
# ✅ Асинхронный эндпоинт (не блокирует)
@app.get("/user/{user_id}")
async def get_user(user_id: int):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/user/{user_id}") as response:
await asyncio.sleep(2)
return await response.json()
# С асинхронностью один сервер на 4GB RAM может обрабатывать:
# - Синхронно: 50-100 concurrent requests
# - Асинхронно: 10,000+ concurrent requests
Пример 3: Фоновые задачи в Celery
from celery import Celery
from celery import group, chain
import asyncio
app = Celery('myapp', broker='amqp://localhost')
# ❌ Синхронное выполнение
@app.task
def send_email(user_id):
# Отправляет email (может быть медленно)
time.sleep(5)
return f"Email sent to user {user_id}"
def process_users_slow(user_ids):
for user_id in user_ids:
send_email(user_id)
# 100 пользователей × 5 сек = 500 сек!
# ✅ Асинхронное выполнение
def process_users_fast(user_ids):
# Запускаем все emails параллельно
job = group(send_email.s(user_id) for user_id in user_ids)
job.apply_async()
# 100 пользователей обрабатываются за 5 сек
Пример 4: Database операции с async ORM
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
import asyncio
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_users():
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
users = result.scalars().all()
return users
async def get_orders():
async with AsyncSessionLocal() as session:
result = await session.execute(select(Order))
orders = result.scalars().all()
return orders
async def main():
# Запускаем обе операции параллельно
users, orders = await asyncio.gather(
get_users(),
get_orders()
)
print(f"Got {len(users)} users and {len(orders)} orders")
asyncio.run(main())
# Синхронно: время_пользователей + время_заказов
# Асинхронно: max(время_пользователей, время_заказов)
Пример 5: Real-time уведомления с WebSockets
from fastapi import FastAPI, WebSocket
import asyncio
import json
app = FastAPI()
active_connections = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# Слушаем сообщение от клиента
data = await websocket.receive_text()
# Отправляем всем клиентам (асинхронно!)
for connection in active_connections:
await connection.send_text(f"New message: {data}")
finally:
active_connections.remove(websocket)
# Без асинхронности каждый клиент блокировал бы сервер
# С асинхронностью 10,000 клиентов работают одновременно
Пример 6: Обработка очереди задач
import asyncio
import aioredis
from typing import Any
class TaskQueue:
def __init__(self):
self.redis = None
async def connect(self):
self.redis = await aioredis.create_redis_pool('redis://localhost')
async def enqueue(self, task_name: str, data: Any):
# Добавляем задачу в очередь
await self.redis.rpush(f"queue:{task_name}", json.dumps(data))
async def process_tasks(self, task_name: str, worker):
while True:
# Берём задачу из очереди
task = await self.redis.lpop(f"queue:{task_name}")
if not task:
await asyncio.sleep(1)
continue
data = json.loads(task)
# Обрабатываем асинхронно
await worker(data)
# Использование
async def process_email(task):
# Отправка email
await asyncio.sleep(2)
print(f"Email sent: {task['email']}")
async def main():
queue = TaskQueue()
await queue.connect()
# Запускаем 10 workers параллельно
workers = [
queue.process_tasks('email', process_email)
for _ in range(10)
]
await asyncio.gather(*workers)
asyncio.run(main())
Пример 7: Timeout и Retry логика
import asyncio
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
AsyncRetrying
)
async def unstable_api_call():
# API может упасть
if random.random() < 0.5:
raise ConnectionError("API failed")
return {"status": "ok"}
# ✅ С retry и timeout
async def fetch_with_retry():
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
):
with attempt:
result = await asyncio.wait_for(
unstable_api_call(),
timeout=5.0 # Максимум 5 секунд на попытку
)
return result
asyncio.run(fetch_with_retry())
Где использовать асинхронность
✅ Используй async:
- HTTP запросы к внешним API
- Database операции
- File I/O
- WebSockets и real-time communication
- Обработка очередей
- Long-polling
❌ Не используй async (используй threads/multiprocessing):
- CPU-intensive операции (обработка изображений, криптография)
- Работа с CPU cores
- Блокирующие C-библиотеки
Вывод
Асинхронность в Python — это способ:
- Обрабатывать много I/O операций одновременно
- Не блокировать основной поток
- Писать масштабируемые веб-приложения
- Эффективно использовать ресурсы сервера
Основной паттерн: await + asyncio.gather() для параллельного выполнения операций.