← Назад к вопросам
Когда лучше использовать асинхронность?
3.0 Senior🔥 151 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Когда использовать асинхронность в Python
Асинхронность — это мощный инструмент, но нужно понимать, когда она действительно полезна.
1. I/O-bound операции — ИДЕАЛЬНЫЙ случай
Асинхронность превосходна для операций, которые ждут ввода-вывода:
import asyncio
import aiohttp
# ✓ Идеально для асинхронности
async def fetch_multiple_apis():
"""Загрузить данные с нескольких API одновременно"""
async with aiohttp.ClientSession() as session:
tasks = [
session.get('https://api.example.com/users'),
session.get('https://api.example.com/posts'),
session.get('https://api.example.com/comments'),
]
results = await asyncio.gather(*tasks)
return results
# Синхронная версия: 3 сек (запрос 1 + запрос 2 + запрос 3)
# Асинхронная версия: ~1 сек (все параллельно)
Время сэкономлено потому, что пока ждём ответа от API, можем обрабатывать другие запросы.
2. Сетевые операции (HTTP, БД запросы)
import asyncio
import asyncpg
# ✓ Идеально
async def fetch_user_data():
"""Загрузить данные пользователя из БД"""
conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
# Эти запросы выполняются параллельно
user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
posts = await conn.fetch('SELECT * FROM posts WHERE user_id = $1', user_id)
comments = await conn.fetch('SELECT * FROM comments WHERE user_id = $1', user_id)
await conn.close()
return {"user": user, "posts": posts, "comments": comments}
3. WebSocket и реал-тайм приложения
import asyncio
from fastapi import FastAPI, WebSocket
app = FastAPI()
# ✓ Необходимо для реал-тайма
@app.websocket("/ws/chat/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
await websocket.accept()
try:
while True:
# Получить сообщение (блокирует, но асинхронно)
data = await websocket.receive_text()
# Отправить всем в комнате
await broadcast_to_room(room_id, data)
except:
await websocket.close()
# Без асинхронности — одно соединение блокирует сервер
4. CPU-bound операции — НЕ рекомендуется
Для тяжёлых вычислений асинхронность НЕ помогает:
# ✗ Плохо для асинхронности
async def cpu_intensive():
"""Сложные вычисления"""
result = 0
for i in range(10**9):
result += i ** 2
return result
# Проблема: await не помогает, пока вычисляется
# Используй multiprocessing или Celery вместо этого
# ✓ Правильно для CPU-bound
from multiprocessing import Process
def cpu_intensive():
return sum(i ** 2 for i in range(10**9))
# Запустить в отдельном процессе
if __name__ == '__main__':
p = Process(target=cpu_intensive)
p.start()
p.join()
5. Задачи с задержками (backoff, retry)
import asyncio
import aiohttp
# ✓ Идеально
async def fetch_with_retry(url, max_retries=3):
"""Загрузить с повторными попытками"""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as response:
return await response.json()
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Экспоненциальный backoff
print(f"Попытка {attempt + 1} не удалась, ждём {wait_time}с")
await asyncio.sleep(wait_time) # Асинхронно ждём
else:
raise
# asyncio.sleep() даёт другим задачам выполняться
# threading.sleep() блокирует весь поток
6. Параллельные длительные операции
import asyncio
# ✓ Идеально
async def process_queue():
"""Обработать очередь сообщений"""
tasks = []
# Создать 100 задач, обрабатывающих сообщения параллельно
for msg in messages:
task = process_message(msg) # asyncio корутина
tasks.append(task)
# Ждём все задачи
results = await asyncio.gather(*tasks)
return results
async def process_message(msg):
# Может быть HTTP запрос, DB запрос и т.д.
await send_to_api(msg)
await log_to_database(msg)
7. Таймауты и отмена задач
import asyncio
# ✓ Полезно для асинхронности
async def operation_with_timeout():
try:
# Отменить задачу, если она не завершится за 5 секунд
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
print("Операция превышила таймаут")
# Можно отменить задачу
return None
# Управление периодическими задачами
async def periodic_task():
while True:
await do_something()
await asyncio.sleep(60) # Выполнять каждые 60 секунд
async def main():
# Запустить периодическую задачу в фоне
task = asyncio.create_task(periodic_task())
# Отменить после некоторого времени
await asyncio.sleep(300)
task.cancel()
8. Обработка множества соединений
# ✓ Идеально
from fastapi import FastAPI
app = FastAPI()
@app.get("/process/{item_id}")
async def process_item(item_id: int):
# Асинхронная обработка позволяет одновременно обслуживать
# тысячи клиентов на одном сервере
result = await fetch_from_db(item_id)
enriched = await enrich_with_external_api(result)
return enriched
# Один синхронный endpoint может обслуживать только ~10-20 одновременных запросов
# Асинхронный endpoint может обслуживать 1000+ одновременных запросов
9. Когда НЕ использовать асинхронность
# ✗ Не нужна асинхронность
# 1. Простые синхронные операции
def simple_calculation(x, y):
return x + y # Не нужен await
# 2. Блокирующие операции в стандартной библиотеке
import requests
response = requests.get(url) # Блокирует! Используй aiohttp вместо этого
# 3. CPU-bound работа
def heavy_computation():
return sum(i**2 for i in range(10**8))
# 4. Если нет параллелизма
def sequential_task():
result1 = blocking_op1() # Ждём результат
result2 = blocking_op2() # Потом это
result3 = blocking_op3() # И это
# Асинхронность не поможет, если операции зависимы
10. Правила выбора
# Проверь:
# 1. Есть ли I/O операции? → Асинхронность может помочь
# 2. Параллельных I/O операций > 1? → Асинхронность улучшит производительность
# 3. Высокая конкурентность (много клиентов)? → Асинхронность необходима
# 4. Это CPU-bound? → Используй multiprocessing или Celery
# 5. Простой скрипт? → Не усложняй асинхронностью
# Примеры выбора
# Веб-сервер с API → asyncio + FastAPI
async def api_handler():
db_result = await fetch_from_db()
external_api = await fetch_from_external_api()
return {"db": db_result, "api": external_api}
# Фоновые задачи → Celery
from celery import shared_task
@shared_task
def process_video(video_id):
# Тяжёлая обработка в отдельном worker'е
pass
# Веб-скрепинг → asyncio + aiohttp
async def scrape_websites():
tasks = [fetch_website(url) for url in urls]
results = await asyncio.gather(*tasks)
# Реал-тайм приложение → WebSocket + asyncio
@app.websocket("/ws")
async def websocket(websocket: WebSocket):
await websocket.accept()
# Обрабатывать сообщения в реал-тайме
Ключевые моменты
- I/O операции — основной случай использования асинхронности
- Параллелизм I/O — множество одновременных запросов
- CPU-bound — используй multiprocessing, а не asyncio
- WebSocket/реал-тайм — asyncio необходим
- Высокая конкурентность — асинхронность масштабирует эффективнее потоков
- Простые скрипты — не всегда нужна асинхронность
- asyncio.sleep() vs threading.sleep() — первый асинхронен, второй нет
Асинхронность — это оружие для правильной задачи. Используй её, когда есть параллельные I/O операции, а не просто потому что это модно.