← Назад к вопросам
Почему начал использовать Sync I/O?
2.0 Middle🔥 151 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Почему начал использовать Sync I/O?
Вопрос предполагает, что я (разработчик) ранее использовал Async I/O, а потом переключился на Sync I/O. Это вполне логичный выбор в определённых сценариях, который я объясню.
Основные причины отказа от Async I/O
1. Сложность кода
Async IO требует:
- Все функции должны быть async
- Нужно использовать await везде
- Сложный debugging
- Нужно учиться новой парадигме
# ASYNC — намного сложнее
import asyncio
import aiohttp
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def process_users():
tasks = [
fetch_data(f"https://api.example.com/users/{i}")
for i in range(1, 11)
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(process_users())
# SYNC — намного проще
import requests
def fetch_data(url):
response = requests.get(url)
return response.json()
def process_users():
results = []
for i in range(1, 11):
data = fetch_data(f"https://api.example.com/users/{i}")
results.append(data)
return results
process_users()
В 90% случаев Sync код проще для понимания и поддержки.
2. Недостаточно высокие требования к пропускной способности
# Async имеет смысл когда:
# - 1000+ одновременных соединений
# - High-throughput API gateway
# - WebSocket server с множеством клиентов
# Sync достаточно когда:
# - Backend API с 10-100 одновременными запросами
# - Простой CRUD сервис
# - Батч-обработка данных
# - Scheduled tasks
# Пример: обработка 100 заказов в день
def process_orders():
orders = get_orders_from_db()
for order in orders:
process_order(order) # Sync I/O — окей!
# Требует Async: обработка 10 000 запросов в секунду
3. Библиотеки не имеют async версии
# Хотим использовать библиотеку, но она только sync
import requests # Нет async версии
import pandas # Нет async версии
import lxml # Нет async версии
# Нельзя использовать в async контексте легко:
# - Нужно использовать asyncio.to_thread() (костыль)
# - Теряется смысл async
# Лучше просто использовать Sync I/O
def process_csv():
df = pandas.read_csv("data.csv") # Sync
return df.sum()
4. Нет I/O bottleneck
# Скорость зависит от БД/CPU, а не от I/O
# ASYNC не поможет если:
def heavy_computation():
# CPU-bound операция
result = sum(fibonacci(n) for n in range(100))
return result
# ASYNC поможет если:
async def fetch_from_api():
# I/O-bound операция
response = await client.get("https://api.example.com/data")
return response.json()
5. Проблемы с Async в production
# 1. Deadlocks в async коде тяжело отловить
async def deadlock_example():
lock = asyncio.Lock()
async with lock:
async with lock: # Deadlock!
pass
# 2. Долгие операции блокируют event loop
async def bad_async():
time.sleep(5) # ❌ Блокирует весь event loop!
await fetch_data()
# 3. Нужно тестировать всё async
@pytest.mark.asyncio
async def test_something():
# Все тесты должны быть async
pass
# Sync код тестировать проще
def test_something():
assert process_data() == expected
Когда использовать Sync I/O
✅ Backend API (FastAPI, Flask)
from fastapi import FastAPI
import requests
app = FastAPI()
# Sync endpoint — FastAPI автоматически запустит в thread pool
@app.get("/users/{user_id}")
def get_user(user_id: int):
# Даже Sync код в FastAPI будет неблокирующим
# FastAPI запустит это в отдельном потоке
response = requests.get(f"https://api.example.com/users/{user_id}")
return response.json()
# Если нужно — можно добавить async, но не обязательно
FastAPI автоматически запустит Sync функции в thread pool, так что они не блокируют главный event loop!
✅ Batch/ETL процессы
# Обработка файлов, ETL, задачи
import pandas as pd
from sqlalchemy import create_engine
def etl_pipeline():
# Sync I/O — окей
df = pd.read_csv("data.csv")
df = df.dropna()
df = df[df["age"] > 18]
engine = create_engine("postgresql://...")
df.to_sql("users", engine)
# Запускаем один раз в день
import schedule
schedule.every().day.at("02:00").do(etl_pipeline)
✅ CLI инструменты
import click
import requests
@click.command()
@click.option("--user-id", type=int, required=True)
def get_user_info(user_id):
"""Получить информацию о пользователе."""
response = requests.get(f"https://api.example.com/users/{user_id}")
user = response.json()
click.echo(f"Name: {user['name']}")
click.echo(f"Email: {user['email']}")
if __name__ == "__main__":
get_user_info()
✅ Скрипты и утилиты
# Для скриптов, которые запускаются в background
# Sync I/O вполне нормален
def backup_database():
# Sync запрос к БД
db.execute("BACKUP DATABASE ...")
# Sync запрос на загрузку
upload_to_s3(backup_file)
# Запускается один раз в сутки — async не нужен
Когда всё-таки нужен Async
# 1. WebSocket сервер с множеством клиентов
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(data)
# 2. High-throughput API (1000+ rps)
@app.get("/fast-endpoint")
async def fast_endpoint():
# Много запросов одновременно
results = await asyncio.gather(*[
fetch_from_db(i) for i in range(100)
])
return results
# 3. Задачи обработки очереди
async def process_message_queue():
while True:
message = await queue.get() # Неблокирующий wait
await process_message(message)
Рекомендация
Начните с Sync, переходите на Async только если:
- ✅ Измерили производительность и нашли bottleneck в I/O
- ✅ У вас действительно много одновременных операций (100+)
- ✅ Все используемые библиотеки поддерживают async
- ✅ Команда готова к сложности async кода
В 80% случаев Sync I/O достаточно, потому что:
- FastAPI / Starlette автоматически распределяют sync функции в thread pool
- Код проще
- Проще дебаг
- Все библиотеки работают
- Меньше ошибок в production
# Правильный подход для большинства REST API
from fastapi import FastAPI
import requests
app = FastAPI()
# Sync функция, но FastAPI запустит её неблокирующим образом
@app.get("/api/users/{user_id}")
def get_user(user_id: int):
return requests.get(f"https://api.example.com/users/{user_id}").json()
# Это работает как асинхронно благодаря FastAPI!