← Назад к вопросам
В каких задачах стоит применять асинхронный код
2.2 Middle🔥 161 комментариев
#FastAPI и Flask#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# В каких задачах стоит применять асинхронный код
Асинхронный код (async/await) позволяет выполнять долгие операции без блокирования основного потока. Это особенно полезно для операций, которые ждут внешних ресурсов.
Когда стоит использовать Async
1. I/O операции (ввод-вывод) — самый частый случай
HTTP запросы к внешним API
import asyncio
import aiohttp
# Синхронный код (блокирует поток на 1-2 секунды)
import requests
def get_user(user_id):
response = requests.get(f'https://api.example.com/users/{user_id}')
return response.json()
# Асинхронный код (не блокирует)
async def get_user_async(user_id):
async with aiohttp.ClientSession() as session:
async with session.get(f'https://api.example.com/users/{user_id}') as response:
return await response.json()
# Использование
async def main():
# Одновременно запрашиваем 100 пользователей
tasks = [get_user_async(i) for i in range(1, 101)]
users = await asyncio.gather(*tasks)
print(f'Получено {len(users)} пользователей')
asyncio.run(main())
Выигрыш: Вместо 100-200 секунд ждём 1-2 секунды!
Запросы к БД
import asyncpg
# Синхронный код
import psycopg2
def get_users():
conn = psycopg2.connect('...')
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
return cursor.fetchall() # Блокирует!
# Асинхронный код
async def get_users_async():
conn = await asyncpg.connect('postgresql://...')
rows = await conn.fetch('SELECT * FROM users') # Не блокирует!
await conn.close()
return rows
2. Параллельные HTTP запросы
import asyncio
import aiohttp
# Синхронный код: 30 секунд
for i in range(10):
requests.get(f'https://api.example.com/data/{i}') # 3 сек * 10
# Асинхронный код: 3 секунды
async def fetch_all():
async with aiohttp.ClientSession() as session:
tasks = [
session.get(f'https://api.example.com/data/{i}')
for i in range(10)
]
responses = await asyncio.gather(*tasks)
return responses
asyncio.run(fetch_all())
3. Фоновые задачи (background jobs)
from fastapi import FastAPI
from fastapi.background import BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(email: str, message: str):
"""Долгая операция, не блокирует ответ"""
await asyncio.sleep(2) # Имитация отправки письма
print(f'Email отправлен на {email}')
@app.post('/send-message")
async def send_message(email: str, message: str, background_tasks: BackgroundTasks):
# Добавляем задачу в фон — ответ вернётся сразу
background_tasks.add_task(send_email, email, message)
return {'status': 'Email в очереди'}
4. WebSocket и долгоживущие соединения
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text() # Ждёт, не блокируя
await websocket.send_text(f"Echo: {data}")
5. Обработка многих одновременных соединений
# Синхронный код (threading)
from threading import Thread
def handle_client(client_socket):
# Каждый клиент → новый поток → потребление памяти
pass
# Асинхронный код
async def handle_client_async(websocket):
# Тысячи клиентов в одном потоке
pass
Когда НЕ стоит использовать Async
1. CPU-bound операции (обработка данных)
# Плохо: async не поможет
async def calculate_prime(n):
# Просто ждёшь вычислений, не I/O
result = 0
for i in range(n):
result += i
return result
# Хорошо: используй multiprocessing
from multiprocessing import Pool
with Pool(4) as pool:
results = pool.map(calculate_prime, [1000000, 2000000, ...])
2. Когда всего один запрос
# Не имеет смысла
async def simple_request():
response = await session.get('https://example.com') # Один запрос — зачем async?
return response
# Просто используй синхронный код
import requests
response = requests.get('https://example.com')
3. Простые скрипты
# Переусложнение
async def main():
with open('file.txt') as f:
content = f.read() # Локальный файл — никакого I/O ждать не нужно
print(content)
asyncio.run(main())
# Просто
with open('file.txt') as f:
content = f.read()
print(content)
Реальные примеры использования
Пример 1: API агрегатор (параллельные запросы)
from fastapi import FastAPI
import aiohttp
import asyncio
app = FastAPI()
@app.get("/market-price/{product_id}")
async def get_market_price(product_id: int):
"""Получить цены с 5 маркетплейсов одновременно"""
async with aiohttp.ClientSession() as session:
tasks = [
fetch_price(session, 'amazon', product_id),
fetch_price(session, 'ebay', product_id),
fetch_price(session, 'alibaba', product_id),
fetch_price(session, 'walmart', product_id),
fetch_price(session, 'target', product_id),
]
prices = await asyncio.gather(*tasks, return_exceptions=True)
return {'prices': prices}
async def fetch_price(session, marketplace, product_id):
try:
async with session.get(f'https://{marketplace}.com/api/price/{product_id}') as resp:
return await resp.json()
except Exception as e:
return {'error': str(e)}
Пример 2: Обработка большого количества очереди
import asyncio
from typing import List
class TaskQueue:
def __init__(self, max_concurrent=5):
self.queue = asyncio.Queue()
self.max_concurrent = max_concurrent
async def process(self, task_func):
"""Обработка задач с ограничением одновременных"""
while True:
try:
task = self.queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
await task_func(task)
finally:
self.queue.task_done()
async def run(self, tasks: List, task_func):
# Добавляем задачи
for task in tasks:
await self.queue.put(task)
# Запускаем несколько workers
workers = [
self.process(task_func)
for _ in range(self.max_concurrent)
]
await asyncio.gather(*workers)
# Использование
async def process_order(order_id):
print(f'Обработка заказа {order_id}')
await asyncio.sleep(1) # Имитация обработки
queue = TaskQueue(max_concurrent=10)
orders = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
await queue.run(orders, process_order)
Пример 3: Микросервис с multiple data sources
from fastapi import FastAPI
import asyncpg
import aiohttp
app = FastAPI()
db_pool = None
@app.on_event("startup")
async def startup():
global db_pool
db_pool = await asyncpg.create_pool('postgresql://...')
@app.get("/user/{user_id}")
async def get_user_profile(user_id: int):
"""Получить профиль из БД и рекомендации из ML сервиса одновременно"""
# Параллельные запросы
user_task = db_pool.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
recs_task = fetch_recommendations(user_id)
user, recommendations = await asyncio.gather(user_task, recs_task)
return {
'user': user,
'recommendations': recommendations
}
async def fetch_recommendations(user_id):
async with aiohttp.ClientSession() as session:
async with session.get(f'https://ml-service.example.com/recommendations/{user_id}') as resp:
return await resp.json()
Сравнение: Sync vs Async
| Операция | Sync (1 сек каждая) | Async |
|---|---|---|
| 1 HTTP запрос | 1 сек | 1 сек |
| 10 HTTP запросов | 10 сек | 1 сек |
| 100 HTTP запросов | 100 сек | 1 сек |
| 1000 WebSocket соединений | Требует 1000 потоков (ОЙ!) | 1 поток (норм!) |
Лучшие практики
1. Используй async для I/O
# API, БД, файлы, сеть → async
2. Не смешивай sync и async без причины
# Плохо
async def bad():
result = sync_function() # Блокирует! Зачем async?
# Хорошо
async def good():
result = await async_function()
3. Используй asyncio.gather для параллелизма
# Запускаем одновременно
results = await asyncio.gather(task1(), task2(), task3())
4. Обрабатывай ошибки
tasks = [fetch(i) for i in range(100)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Ошибки будут в results как Exception объекты
Итоги
✅ Используй async для:
- HTTP запросы к API
- Запросы к БД
- Файловые операции
- WebSocket и долгие соединения
- Параллельная обработка I/O
❌ Не используй async для:
- CPU-intensive вычисления (используй multiprocessing)
- Простые операции без ввода-вывода
- Если усложняет код без причины
Правило: Async это про масштабируемость — один поток обрабатывает 1000 соединений, а не требует 1000 потоков!