← Назад к вопросам

Как синхронный код переписать на асинхронный?

3.0 Senior🔥 251 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Переход с синхронного кода на асинхронный

Асинхронное программирование позволяет обрабатывать множество операций одновременно без блокирования. Вот как правильно переписать синхронный код на асинхронный.

Основные концепции

Синхронный код:

# Блокирует выполнение на каждой операции
result1 = get_user(1)  # Ждём 1 сек
result2 = get_user(2)  # Ждём 1 сек
result3 = get_user(3)  # Ждём 1 сек
# Итого: 3 секунды

Асинхронный код:

# Запускает все операции одновременно
result1, result2, result3 = await asyncio.gather(
    get_user(1),
    get_user(2),
    get_user(3)
)
# Итого: ~1 секунда (время самой долгой операции)

Шаг 1: Преобразование базовой функции

# ❌ Синхронный код
def get_user(user_id: int):
    import time
    time.sleep(1)  # Блокирует
    return {"id": user_id, "name": f"User {user_id}"}

user = get_user(1)
print(user)

# ✅ Асинхронный код
import asyncio

async def get_user(user_id: int):
    await asyncio.sleep(1)  # Не блокирует
    return {"id": user_id, "name": f"User {user_id}"}

user = await get_user(1)
print(user)

Шаг 2: Запуск асинхронной функции

import asyncio

async def main():
    # Вызов async функции из async функции
    user = await get_user(1)
    print(user)

# Запуск из синхронного кода
asyncio.run(main())

Шаг 3: Параллельное выполнение

import asyncio

async def get_user(user_id: int):
    print(f"Fetching user {user_id}...")
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    # Способ 1: asyncio.gather() — рекомендуется
    results = await asyncio.gather(
        get_user(1),
        get_user(2),
        get_user(3)
    )
    print(results)

asyncio.run(main())
# Output (всё за 1 секунду):
# Fetching user 1...
# Fetching user 2...
# Fetching user 3...
# [{'id': 1, ...}, {'id': 2, ...}, {'id': 3, ...}]

Шаг 4: Обработка ошибок

import asyncio

async def get_user(user_id: int):
    await asyncio.sleep(1)
    if user_id == 2:
        raise ValueError(f"User {user_id} not found")
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    try:
        results = await asyncio.gather(
            get_user(1),
            get_user(2),
            get_user(3),
            return_exceptions=True  # Вернуть исключения вместо отмены
        )
        print(results)
        # [{'id': 1, ...}, ValueError(...), {'id': 3, ...}]
    except Exception as e:
        print(f"Error: {e}")

asyncio.run(main())

Пример: FastAPI endpoint

from fastapi import FastAPI
import asyncio

app = FastAPI()

# ❌ Синхронный (блокирует другие запросы)
@app.get("/users/{user_id}")
def get_user(user_id: int):
    import time
    time.sleep(2)  # Блокирует весь сервер
    return {"id": user_id, "name": f"User {user_id}"}

# ✅ Асинхронный (обрабатывает параллельно)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    await asyncio.sleep(2)  # Не блокирует
    return {"id": user_id, "name": f"User {user_id}"}

# Теперь несколько запросов обработаются параллельно

Работа с HTTP запросами

# ❌ Синхронный код
import requests

def get_github_user(username: str):
    response = requests.get(f"https://api.github.com/users/{username}")
    return response.json()

users = [
    get_github_user("torvalds"),
    get_github_user("gvanrossum"),
    get_github_user("guido")
]
# Это займёт ~3 секунды (если каждый запрос ~1 сек)

# ✅ Асинхронный код
import aiohttp
import asyncio

async def get_github_user(username: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.github.com/users/{username}") as response:
            return await response.json()

async def main():
    users = await asyncio.gather(
        get_github_user("torvalds"),
        get_github_user("gvanrossum"),
        get_github_user("guido")
    )
    return users

results = asyncio.run(main())
# Это займёт ~1 секунду (все запросы параллельны)

Работа с базой данных

# ❌ Синхронный код
from sqlalchemy.orm import Session

def get_users_sync(db: Session, ids: list):
    users = []
    for user_id in ids:
        user = db.query(User).filter(User.id == user_id).first()
        users.append(user)
    return users

db = get_session()
users = get_users_sync(db, [1, 2, 3])

# ✅ Асинхронный код
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

async def get_users_async(db: AsyncSession, ids: list):
    stmt = select(User).where(User.id.in_(ids))
    result = await db.execute(stmt)
    return result.scalars().all()

db = get_async_session()
users = await get_users_async(db, [1, 2, 3])

Частые паттерны

1. Преобразование loop в async:

# ❌ Синхронный
results = []
for item_id in item_ids:
    result = get_item(item_id)  # Блокирует
    results.append(result)

# ✅ Асинхронный
results = await asyncio.gather(
    *[get_item(item_id) for item_id in item_ids]
)

2. Timeout:

import asyncio

async def main():
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=5.0  # Максимум 5 секунд
        )
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

3. Rate limiting:

import asyncio
from asyncio import Semaphore

async def main():
    # Максимум 3 одновременных операции
    semaphore = Semaphore(3)
    
    async def limited_operation(item_id):
        async with semaphore:
            return await get_item(item_id)
    
    tasks = [limited_operation(i) for i in range(100)]
    results = await asyncio.gather(*tasks)

asyncio.run(main())

Контрольный список миграции

# 1. Добавить async/await
# ❌ def my_function():
# ✅ async def my_function():

# 2. Заменить time.sleep() на asyncio.sleep()
# ❌ time.sleep(1)
# ✅ await asyncio.sleep(1)

# 3. Заменить requests на aiohttp
# ❌ response = requests.get(url)
# ✅ async with aiohttp.ClientSession() as session:
#        async with session.get(url) as response:
#            data = await response.json()

# 4. Заменить синхронную БД на асинхронную
# ❌ from sqlalchemy.orm import Session
# ✅ from sqlalchemy.ext.asyncio import AsyncSession

# 5. Использовать asyncio.gather() для параллелизма
# ❌ result1 = await func1()
#    result2 = await func2()
# ✅ result1, result2 = await asyncio.gather(func1(), func2())

# 6. Добавить error handling
# ✅ return_exceptions=True в gather()

Полный пример: API сервис

from fastapi import FastAPI, Depends
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

async def get_db():
    async with AsyncSession(engine) as session:
        yield session

# ❌ ПЛОХО: синхронный код
@app.get("/api/v1/users/{user_id}/full")
def get_user_full(user_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    posts = db.query(Post).filter(Post.user_id == user_id).all()
    comments = db.query(Comment).filter(Comment.user_id == user_id).all()
    return {"user": user, "posts": posts, "comments": comments}

# ✅ ХОРОШО: асинхронный код
@app.get("/api/v1/users/{user_id}/full")
async def get_user_full(user_id: int, db: AsyncSession = Depends(get_db)):
    # Получаем все данные параллельно
    user, posts, comments = await asyncio.gather(
        db.execute(select(User).where(User.id == user_id)),
        db.execute(select(Post).where(Post.user_id == user_id)),
        db.execute(select(Comment).where(Comment.user_id == user_id))
    )
    return {
        "user": user.scalar_one_or_none(),
        "posts": posts.scalars().all(),
        "comments": comments.scalars().all()
    }

Производительность

import asyncio
import time

# Синхронный
def sync_version():
    start = time.time()
    for i in range(3):
        time.sleep(1)  # 1 сек на операцию
    return time.time() - start

# Асинхронный
async def async_version():
    start = time.time()
    await asyncio.gather(*[
        asyncio.sleep(1)  # 1 сек на операцию
        for i in range(3)
    ])
    return time.time() - start

print(f"Sync: {sync_version():.2f}s")      # 3 секунды
print(f"Async: {asyncio.run(async_version()):.2f}s")  # 1 секунда

Когда использовать async

Используй async для:

  • I/O операций (HTTP, БД, файлы)
  • API серверов (FastAPI, aiohttp)
  • Когда нужна обработка многих операций одновременно
  • Когда операции часто ждут ответа

Не используй async для:

  • CPU-intensive операций (они не дают выигрыша)
  • Простых синхронных скриптов
  • Когда нет параллельности

Основное правило: async хорошо для I/O, остаётся синхронным для CPU.