← Назад к вопросам

Почему начал использовать Sync I/O?

2.0 Middle🔥 151 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Почему начал использовать Sync I/O?

Вопрос предполагает, что я (разработчик) ранее использовал Async I/O, а потом переключился на Sync I/O. Это вполне логичный выбор в определённых сценариях, который я объясню.

Основные причины отказа от Async I/O

1. Сложность кода

Async IO требует:

  • Все функции должны быть async
  • Нужно использовать await везде
  • Сложный debugging
  • Нужно учиться новой парадигме
# ASYNC — намного сложнее
import asyncio
import aiohttp

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def process_users():
    tasks = [
        fetch_data(f"https://api.example.com/users/{i}")
        for i in range(1, 11)
    ]
    results = await asyncio.gather(*tasks)
    return results

asyncio.run(process_users())

# SYNC — намного проще
import requests

def fetch_data(url):
    response = requests.get(url)
    return response.json()

def process_users():
    results = []
    for i in range(1, 11):
        data = fetch_data(f"https://api.example.com/users/{i}")
        results.append(data)
    return results

process_users()

В 90% случаев Sync код проще для понимания и поддержки.

2. Недостаточно высокие требования к пропускной способности

# Async имеет смысл когда:
# - 1000+ одновременных соединений
# - High-throughput API gateway
# - WebSocket server с множеством клиентов

# Sync достаточно когда:
# - Backend API с 10-100 одновременными запросами
# - Простой CRUD сервис
# - Батч-обработка данных
# - Scheduled tasks

# Пример: обработка 100 заказов в день
def process_orders():
    orders = get_orders_from_db()
    for order in orders:
        process_order(order)  # Sync I/O — окей!

# Требует Async: обработка 10 000 запросов в секунду

3. Библиотеки не имеют async версии

# Хотим использовать библиотеку, но она только sync

import requests  # Нет async версии
import pandas   # Нет async версии
import lxml     # Нет async версии

# Нельзя использовать в async контексте легко:
# - Нужно использовать asyncio.to_thread() (костыль)
# - Теряется смысл async

# Лучше просто использовать Sync I/O
def process_csv():
    df = pandas.read_csv("data.csv")  # Sync
    return df.sum()

4. Нет I/O bottleneck

# Скорость зависит от БД/CPU, а не от I/O

# ASYNC не поможет если:
def heavy_computation():
    # CPU-bound операция
    result = sum(fibonacci(n) for n in range(100))
    return result

# ASYNC поможет если:
async def fetch_from_api():
    # I/O-bound операция
    response = await client.get("https://api.example.com/data")
    return response.json()

5. Проблемы с Async в production

# 1. Deadlocks в async коде тяжело отловить
async def deadlock_example():
    lock = asyncio.Lock()
    async with lock:
        async with lock:  # Deadlock!
            pass

# 2. Долгие операции блокируют event loop
async def bad_async():
    time.sleep(5)  # ❌ Блокирует весь event loop!
    await fetch_data()

# 3. Нужно тестировать всё async
@pytest.mark.asyncio
async def test_something():
    # Все тесты должны быть async
    pass

# Sync код тестировать проще
def test_something():
    assert process_data() == expected

Когда использовать Sync I/O

✅ Backend API (FastAPI, Flask)

from fastapi import FastAPI
import requests

app = FastAPI()

# Sync endpoint — FastAPI автоматически запустит в thread pool
@app.get("/users/{user_id}")
def get_user(user_id: int):
    # Даже Sync код в FastAPI будет неблокирующим
    # FastAPI запустит это в отдельном потоке
    response = requests.get(f"https://api.example.com/users/{user_id}")
    return response.json()

# Если нужно — можно добавить async, но не обязательно

FastAPI автоматически запустит Sync функции в thread pool, так что они не блокируют главный event loop!

✅ Batch/ETL процессы

# Обработка файлов, ETL, задачи
import pandas as pd
from sqlalchemy import create_engine

def etl_pipeline():
    # Sync I/O — окей
    df = pd.read_csv("data.csv")
    df = df.dropna()
    df = df[df["age"] > 18]
    
    engine = create_engine("postgresql://...")
    df.to_sql("users", engine)

# Запускаем один раз в день
import schedule
schedule.every().day.at("02:00").do(etl_pipeline)

✅ CLI инструменты

import click
import requests

@click.command()
@click.option("--user-id", type=int, required=True)
def get_user_info(user_id):
    """Получить информацию о пользователе."""
    response = requests.get(f"https://api.example.com/users/{user_id}")
    user = response.json()
    click.echo(f"Name: {user['name']}")
    click.echo(f"Email: {user['email']}")

if __name__ == "__main__":
    get_user_info()

✅ Скрипты и утилиты

# Для скриптов, которые запускаются в background
# Sync I/O вполне нормален

def backup_database():
    # Sync запрос к БД
    db.execute("BACKUP DATABASE ...")
    # Sync запрос на загрузку
    upload_to_s3(backup_file)

# Запускается один раз в сутки — async не нужен

Когда всё-таки нужен Async

# 1. WebSocket сервер с множеством клиентов
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(data)

# 2. High-throughput API (1000+ rps)
@app.get("/fast-endpoint")
async def fast_endpoint():
    # Много запросов одновременно
    results = await asyncio.gather(*[
        fetch_from_db(i) for i in range(100)
    ])
    return results

# 3. Задачи обработки очереди
async def process_message_queue():
    while True:
        message = await queue.get()  # Неблокирующий wait
        await process_message(message)

Рекомендация

Начните с Sync, переходите на Async только если:

  1. ✅ Измерили производительность и нашли bottleneck в I/O
  2. ✅ У вас действительно много одновременных операций (100+)
  3. ✅ Все используемые библиотеки поддерживают async
  4. ✅ Команда готова к сложности async кода

В 80% случаев Sync I/O достаточно, потому что:

  • FastAPI / Starlette автоматически распределяют sync функции в thread pool
  • Код проще
  • Проще дебаг
  • Все библиотеки работают
  • Меньше ошибок в production
# Правильный подход для большинства REST API
from fastapi import FastAPI
import requests

app = FastAPI()

# Sync функция, но FastAPI запустит её неблокирующим образом
@app.get("/api/users/{user_id}")
def get_user(user_id: int):
    return requests.get(f"https://api.example.com/users/{user_id}").json()

# Это работает как асинхронно благодаря FastAPI!
Почему начал использовать Sync I/O? | PrepBro