Как работает asyncio в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как работает asyncio в Python
asyncio — это встроенная библиотека Python для написания асинхронного кода. Она позволяет выполнять множество операций ввода-вывода (I/O) конкурентно без блокировки потока. Давайте разберёмся, как это работает под капотом.
Базовые концепции
1. Event Loop (цикл событий)
Это сердце asyncio. Event Loop запускает корутины и переключается между ними при возникновении I/O операций.
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # I/O операция
print("World")
# Запуск event loop
asyncio.run(hello())
Внутри asyncio.run() происходит:
- Создание event loop
- Запуск корутины
- Запуск цикла событий
- Закрытие event loop при завершении
2. Корутины (async def)
Это функции, которые могут быть приостановлены и возобновлены.
async def fetch_data(url):
print(f"Fetching {url}...")
await asyncio.sleep(2) # Имитация сетевого запроса
return f"Data from {url}"
# Это не корутина, а объект корутины
coro = fetch_data("http://example.com")
print(type(coro)) # <class 'coroutine'>
# Запустить можно только через event loop
result = asyncio.run(fetch_data("http://example.com"))
3. Await (ожидание)
await указывает, где корутина может быть приостановлена, позволяя event loop переключиться на другую корутину.
async def main():
# Это будет выполняться последовательно (медленно)
result1 = await fetch_data("url1") # Ждём 2 сек
result2 = await fetch_data("url2") # Ждём ещё 2 сек
# Итого: 4 сек
print(asyncio.run(main())) # Output: 4 сек
Как работает event loop
Это упрощённый пример того, как работает event loop:
class SimpleEventLoop:
def __init__(self):
self.ready = [] # Корутины, готовые к выполнению
self.sleeping = {} # Корутины, ждущие timeout
def run_until_complete(self, coro):
self.ready.append(coro)
while self.ready or self.sleeping:
# 1. Выполнить все готовые корутины
while self.ready:
coro = self.ready.pop(0)
try:
coro.send(None) # Запустить корутину
except StopIteration as e:
return e.value # Корутина завершена
# 2. Ждать I/O или timeout
timeout = min(self.sleeping.keys()) if self.sleeping else None
time.sleep(timeout)
Асинхронное выполнение — это не многопоточность! Это кооперативная многозадачность, где каждая корутина добровольно отдаёт контроль через await.
Практический пример: конкурентные запросы
Неправильный подход (последовательно, медленно)
import asyncio
import time
async def fetch(url):
await asyncio.sleep(1) # Имитация сетевого запроса
return f"Data from {url}"
async def main_slow():
start = time.time()
# Выполняется одна за другой (2 секунды)
result1 = await fetch("url1")
result2 = await fetch("url2")
result3 = await fetch("url3")
elapsed = time.time() - start
print(f"Time: {elapsed:.1f}s") # Output: Time: 3.0s
return [result1, result2, result3]
asyncio.run(main_slow())
Правильный подход (конкурентно, быстро)
async def main_fast():
start = time.time()
# Создаём все задачи одновременно
tasks = [
asyncio.create_task(fetch("url1")),
asyncio.create_task(fetch("url2")),
asyncio.create_task(fetch("url3")),
]
# Ждём, пока все завершатся (только 1 секунда!)
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"Time: {elapsed:.1f}s") # Output: Time: 1.0s
return results
asyncio.run(main_fast())
Разница: 3 секунды против 1 секунды!
Основные функции asyncio
asyncio.gather() — ждём все задачи
async def main():
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3"),
return_exceptions=True # Не падаем, если одна ошибка
)
return results
asyncio.wait() — ждём с условиями
async def main():
tasks = [
asyncio.create_task(fetch("url1")),
asyncio.create_task(fetch("url2")),
]
# Ждём, пока хоть одна завершится
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Отменяем остальные
for task in pending:
task.cancel()
asyncio.create_task() — запланировать выполнение
async def main():
# Задача запущена, но не ждём результат сразу
task = asyncio.create_task(fetch("url1"))
# Можем сделать что-то ещё
print("Doing something else...")
# Потом получить результат
result = await task
return result
asyncio.sleep() — асинхронная задержка
async def main():
print("Start")
await asyncio.sleep(1) # Не блокирует другие корутины!
print("After 1 second")
Реальный пример: HTTP запросы
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/google",
"https://api.github.com/users/facebook",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"User: {result['login']}")
asyncio.run(main())
Обработка ошибок
async def fetch_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
# Попытка выполнить операцию
result = await fetch(url)
return result
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def main():
try:
result = await fetch_with_retry("url")
except Exception as e:
print(f"Failed after retries: {e}")
asyncio.run(main())
Важные особенности
1. asyncio не многопоточность
# НЕПРАВИЛЬНО — CPU-bound операция заблокирует event loop
async def bad():
time.sleep(1) # Блокирует весь event loop!
# ПРАВИЛЬНО — используй await с I/O операциями
async def good():
await asyncio.sleep(1) # Не блокирует
2. Исключение из run_forever()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def task():
await asyncio.sleep(1)
print("Task done")
loop.create_task(task())
loop.run_forever() # Запуск цикла
loop.close()
3. Отмена задач
async def main():
task = asyncio.create_task(fetch("url"))
await asyncio.sleep(0.5)
task.cancel() # Отменить задачу
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
Производительность
аsyncio идеален для I/O-bound задач:
# I/O-bound (asyncio идеален)
async def io_bound():
async with aiohttp.ClientSession() as session:
for _ in range(100):
await fetch(session, "url") # Сетевые запросы
# CPU-bound (используй multiprocessing или thread pool)
def cpu_bound():
result = sum(range(10**8)) # Тяжёлые вычисления
return result
# Комбинированный подход
async def mixed():
# I/O в asyncio
data = await fetch_data()
# CPU в thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, cpu_intensive_func, data)
return result
Вывод
asyncio — это мощный инструмент для конкурентного программирования в Python, идеально подходит для I/O-bound приложений (веб-серверы, API клиенты, боты). Благодаря event loop и кооперативной многозадачности, можно обрабатывать тысячи операций без затрат на переключение потоков.