← Назад к вопросам
Чем запускать параллельно I/O bound задачи в Python?
2.8 Senior🔥 251 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Параллельное выполнение I/O-bound задач в Python
Обзор решений
Для I/O-bound операций (сетевые запросы, чтение файлов, БД запросы) в Python есть три основных подхода:
- asyncio — асинхронное программирование (рекомендуется)
- threading — многопоточность
- multiprocessing — многопроцессность (для CPU-bound)
1. asyncio (лучший выбор для I/O)
Преимущества:
- Одна OS-нить, минимальные издержки
- Тысячи одновременных задач
- Современный подход
- Встроен в Python
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Асинхронный запрос к URL"""
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
# Запускаем все задачи параллельно
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Запуск
results = asyncio.run(main())
print(f"Получено {len(results)} результатов")
Вывод: все три запроса выполнятся примерно за 2 секунды (максимум из них), а не за 4 последовательно.
asyncio с timeout и обработкой ошибок
import asyncio
import aiohttp
async def fetch_with_timeout(session, url, timeout=5):
"""Запрос с timeout и обработкой ошибок"""
try:
async with session.get(url, timeout=timeout) as response:
return {"url": url, "status": response.status}
except asyncio.TimeoutError:
return {"url": url, "error": "Timeout"}
except aiohttp.ClientError as e:
return {"url": url, "error": str(e)}
async def main():
urls = ["https://httpbin.org/delay/1"] * 10
async with aiohttp.ClientSession() as session:
# Выполнить с ограничением одновременных запросов (семафор)
semaphore = asyncio.Semaphore(5) # Максимум 5 одновременно
async def bounded_fetch(url):
async with semaphore:
return await fetch_with_timeout(session, url)
tasks = [bounded_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(main())
for r in results:
print(r)
2. threading (старый подход, но работает)
Преимущества:
- Проще чем asyncio
- Встроен в Python
Недостатки:
- GIL блокирует CPU-bound операции
- Тяжелее чем asyncio
- Сложнее с синхронизацией
import threading
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_url(url):
"""Обычный синхронный запрос"""
response = requests.get(url)
return {"url": url, "status": response.status_code}
def main_threading():
urls = ["https://httpbin.org/delay/1"] * 5
# Способ 1: ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
return results
# Способ 2: Ручное управление потоками (не рекомендуется)
def main_manual():
urls = ["https://httpbin.org/delay/1"] * 5
threads = []
results = []
def worker(url):
results.append(fetch_url(url))
for url in urls:
t = threading.Thread(target=worker, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join() # Ждём завершения
return results
start = time.time()
results = main_threading()
print(f"Выполнено за {time.time() - start:.2f}s")
Сравнение asyncio vs threading
import asyncio
import time
import aiohttp
import requests
from concurrent.futures import ThreadPoolExecutor
# asyncio версия
async def async_version():
async def fetch(session, url):
async with session.get(url) as r:
return r.status
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, "https://httpbin.org/delay/1") for _ in range(10)]
return await asyncio.gather(*tasks)
# threading версия
def threading_version():
def fetch(url):
return requests.get(url).status_code
with ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(fetch, ["https://httpbin.org/delay/1"] * 10))
# Тестирование
start = time.time()
asyncio.run(async_version())
print(f"asyncio: {time.time() - start:.2f}s")
start = time.time()
threading_version()
print(f"threading: {time.time() - start:.2f}s")
Результаты: оба примерно одинаковые для I/O-bound, но asyncio использует меньше памяти.
3. asyncio с более сложным примером
import asyncio
import sqlite3
# Асинхронное чтение файла
async def read_file_async(filepath):
"""Имитация асинхронного чтения"""
await asyncio.sleep(1) # Имитируем I/O
return f"Содержимое {filepath}"
# Асинхронный запрос к БД
async def query_database_async(query):
"""Имитация асинхронного запроса БД"""
await asyncio.sleep(0.5) # Имитируем I/O
return f"Результат: {query}"
async def main():
# Запускаем все операции параллельно
file_task = read_file_async("data.txt")
db_task = query_database_async("SELECT * FROM users")
# Ждём обе операции
file_result, db_result = await asyncio.gather(file_task, db_task)
print(file_result)
print(db_result)
asyncio.run(main()) # Выполнится за ~1 секунду, а не за 1.5
Когда использовать что
| Задача | Инструмент | Примечание |
|---|---|---|
| Сетевые запросы | asyncio + aiohttp | Оптимально для множества запросов |
| Чтение файлов | asyncio + aiofiles | Современный подход |
| Запросы БД | asyncio + async драйвер (asyncpg, motor) | Специализированные драйверы |
| Legacy код | threading | Если уже используется requests |
| CPU-bound | multiprocessing | Не используйте threading! |
| Простые задачи | concurrent.futures | Когда asyncio избыточен |
Реальный пример: парсинг нескольких сайтов
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def parse_page(session, url):
"""Асинхронный парсинг страницы"""
try:
async with session.get(url, timeout=10) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
return {"url": url, "title": title.text if title else "N/A"}
except Exception as e:
return {"url": url, "error": str(e)}
async def main():
urls = [
"https://python.org",
"https://github.com",
"https://stackoverflow.com",
]
async with aiohttp.ClientSession() as session:
tasks = [parse_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(main())
for r in results:
print(r)
Заключение
Для I/O-bound задач рекомендуется:
- asyncio — если пишете новый код (лучшая производительность)
- threading — если наследуете старый код (requests, sqlite3)
- multiprocessing — ТОЛЬКО если задача CPU-bound, а не I/O
Золотое правило: I/O → asyncio или threading, CPU → multiprocessing.