← Назад к вопросам
Как синхронный код переписать на асинхронный?
3.0 Senior🔥 251 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Переход с синхронного кода на асинхронный
Асинхронное программирование позволяет обрабатывать множество операций одновременно без блокирования. Вот как правильно переписать синхронный код на асинхронный.
Основные концепции
Синхронный код:
# Блокирует выполнение на каждой операции
result1 = get_user(1) # Ждём 1 сек
result2 = get_user(2) # Ждём 1 сек
result3 = get_user(3) # Ждём 1 сек
# Итого: 3 секунды
Асинхронный код:
# Запускает все операции одновременно
result1, result2, result3 = await asyncio.gather(
get_user(1),
get_user(2),
get_user(3)
)
# Итого: ~1 секунда (время самой долгой операции)
Шаг 1: Преобразование базовой функции
# ❌ Синхронный код
def get_user(user_id: int):
import time
time.sleep(1) # Блокирует
return {"id": user_id, "name": f"User {user_id}"}
user = get_user(1)
print(user)
# ✅ Асинхронный код
import asyncio
async def get_user(user_id: int):
await asyncio.sleep(1) # Не блокирует
return {"id": user_id, "name": f"User {user_id}"}
user = await get_user(1)
print(user)
Шаг 2: Запуск асинхронной функции
import asyncio
async def main():
# Вызов async функции из async функции
user = await get_user(1)
print(user)
# Запуск из синхронного кода
asyncio.run(main())
Шаг 3: Параллельное выполнение
import asyncio
async def get_user(user_id: int):
print(f"Fetching user {user_id}...")
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Способ 1: asyncio.gather() — рекомендуется
results = await asyncio.gather(
get_user(1),
get_user(2),
get_user(3)
)
print(results)
asyncio.run(main())
# Output (всё за 1 секунду):
# Fetching user 1...
# Fetching user 2...
# Fetching user 3...
# [{'id': 1, ...}, {'id': 2, ...}, {'id': 3, ...}]
Шаг 4: Обработка ошибок
import asyncio
async def get_user(user_id: int):
await asyncio.sleep(1)
if user_id == 2:
raise ValueError(f"User {user_id} not found")
return {"id": user_id, "name": f"User {user_id}"}
async def main():
try:
results = await asyncio.gather(
get_user(1),
get_user(2),
get_user(3),
return_exceptions=True # Вернуть исключения вместо отмены
)
print(results)
# [{'id': 1, ...}, ValueError(...), {'id': 3, ...}]
except Exception as e:
print(f"Error: {e}")
asyncio.run(main())
Пример: FastAPI endpoint
from fastapi import FastAPI
import asyncio
app = FastAPI()
# ❌ Синхронный (блокирует другие запросы)
@app.get("/users/{user_id}")
def get_user(user_id: int):
import time
time.sleep(2) # Блокирует весь сервер
return {"id": user_id, "name": f"User {user_id}"}
# ✅ Асинхронный (обрабатывает параллельно)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
await asyncio.sleep(2) # Не блокирует
return {"id": user_id, "name": f"User {user_id}"}
# Теперь несколько запросов обработаются параллельно
Работа с HTTP запросами
# ❌ Синхронный код
import requests
def get_github_user(username: str):
response = requests.get(f"https://api.github.com/users/{username}")
return response.json()
users = [
get_github_user("torvalds"),
get_github_user("gvanrossum"),
get_github_user("guido")
]
# Это займёт ~3 секунды (если каждый запрос ~1 сек)
# ✅ Асинхронный код
import aiohttp
import asyncio
async def get_github_user(username: str):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.github.com/users/{username}") as response:
return await response.json()
async def main():
users = await asyncio.gather(
get_github_user("torvalds"),
get_github_user("gvanrossum"),
get_github_user("guido")
)
return users
results = asyncio.run(main())
# Это займёт ~1 секунду (все запросы параллельны)
Работа с базой данных
# ❌ Синхронный код
from sqlalchemy.orm import Session
def get_users_sync(db: Session, ids: list):
users = []
for user_id in ids:
user = db.query(User).filter(User.id == user_id).first()
users.append(user)
return users
db = get_session()
users = get_users_sync(db, [1, 2, 3])
# ✅ Асинхронный код
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
async def get_users_async(db: AsyncSession, ids: list):
stmt = select(User).where(User.id.in_(ids))
result = await db.execute(stmt)
return result.scalars().all()
db = get_async_session()
users = await get_users_async(db, [1, 2, 3])
Частые паттерны
1. Преобразование loop в async:
# ❌ Синхронный
results = []
for item_id in item_ids:
result = get_item(item_id) # Блокирует
results.append(result)
# ✅ Асинхронный
results = await asyncio.gather(
*[get_item(item_id) for item_id in item_ids]
)
2. Timeout:
import asyncio
async def main():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0 # Максимум 5 секунд
)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
3. Rate limiting:
import asyncio
from asyncio import Semaphore
async def main():
# Максимум 3 одновременных операции
semaphore = Semaphore(3)
async def limited_operation(item_id):
async with semaphore:
return await get_item(item_id)
tasks = [limited_operation(i) for i in range(100)]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Контрольный список миграции
# 1. Добавить async/await
# ❌ def my_function():
# ✅ async def my_function():
# 2. Заменить time.sleep() на asyncio.sleep()
# ❌ time.sleep(1)
# ✅ await asyncio.sleep(1)
# 3. Заменить requests на aiohttp
# ❌ response = requests.get(url)
# ✅ async with aiohttp.ClientSession() as session:
# async with session.get(url) as response:
# data = await response.json()
# 4. Заменить синхронную БД на асинхронную
# ❌ from sqlalchemy.orm import Session
# ✅ from sqlalchemy.ext.asyncio import AsyncSession
# 5. Использовать asyncio.gather() для параллелизма
# ❌ result1 = await func1()
# result2 = await func2()
# ✅ result1, result2 = await asyncio.gather(func1(), func2())
# 6. Добавить error handling
# ✅ return_exceptions=True в gather()
Полный пример: API сервис
from fastapi import FastAPI, Depends
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
async def get_db():
async with AsyncSession(engine) as session:
yield session
# ❌ ПЛОХО: синхронный код
@app.get("/api/v1/users/{user_id}/full")
def get_user_full(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
posts = db.query(Post).filter(Post.user_id == user_id).all()
comments = db.query(Comment).filter(Comment.user_id == user_id).all()
return {"user": user, "posts": posts, "comments": comments}
# ✅ ХОРОШО: асинхронный код
@app.get("/api/v1/users/{user_id}/full")
async def get_user_full(user_id: int, db: AsyncSession = Depends(get_db)):
# Получаем все данные параллельно
user, posts, comments = await asyncio.gather(
db.execute(select(User).where(User.id == user_id)),
db.execute(select(Post).where(Post.user_id == user_id)),
db.execute(select(Comment).where(Comment.user_id == user_id))
)
return {
"user": user.scalar_one_or_none(),
"posts": posts.scalars().all(),
"comments": comments.scalars().all()
}
Производительность
import asyncio
import time
# Синхронный
def sync_version():
start = time.time()
for i in range(3):
time.sleep(1) # 1 сек на операцию
return time.time() - start
# Асинхронный
async def async_version():
start = time.time()
await asyncio.gather(*[
asyncio.sleep(1) # 1 сек на операцию
for i in range(3)
])
return time.time() - start
print(f"Sync: {sync_version():.2f}s") # 3 секунды
print(f"Async: {asyncio.run(async_version()):.2f}s") # 1 секунда
Когда использовать async
✅ Используй async для:
- I/O операций (HTTP, БД, файлы)
- API серверов (FastAPI, aiohttp)
- Когда нужна обработка многих операций одновременно
- Когда операции часто ждут ответа
❌ Не используй async для:
- CPU-intensive операций (они не дают выигрыша)
- Простых синхронных скриптов
- Когда нет параллельности
Основное правило: async хорошо для I/O, остаётся синхронным для CPU.