← Назад к вопросам

Как работает asyncio в Python?

2.0 Middle🔥 191 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как работает asyncio в Python

asyncio — это встроенная библиотека Python для написания асинхронного кода. Она позволяет выполнять множество операций ввода-вывода (I/O) конкурентно без блокировки потока. Давайте разберёмся, как это работает под капотом.

Базовые концепции

1. Event Loop (цикл событий)

Это сердце asyncio. Event Loop запускает корутины и переключается между ними при возникновении I/O операций.

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # I/O операция
    print("World")

# Запуск event loop
asyncio.run(hello())

Внутри asyncio.run() происходит:

  1. Создание event loop
  2. Запуск корутины
  3. Запуск цикла событий
  4. Закрытие event loop при завершении

2. Корутины (async def)

Это функции, которые могут быть приостановлены и возобновлены.

async def fetch_data(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # Имитация сетевого запроса
    return f"Data from {url}"

# Это не корутина, а объект корутины
coro = fetch_data("http://example.com")
print(type(coro))  # <class 'coroutine'>

# Запустить можно только через event loop
result = asyncio.run(fetch_data("http://example.com"))

3. Await (ожидание)

await указывает, где корутина может быть приостановлена, позволяя event loop переключиться на другую корутину.

async def main():
    # Это будет выполняться последовательно (медленно)
    result1 = await fetch_data("url1")  # Ждём 2 сек
    result2 = await fetch_data("url2")  # Ждём ещё 2 сек
    # Итого: 4 сек
    
print(asyncio.run(main()))  # Output: 4 сек

Как работает event loop

Это упрощённый пример того, как работает event loop:

class SimpleEventLoop:
    def __init__(self):
        self.ready = []  # Корутины, готовые к выполнению
        self.sleeping = {}  # Корутины, ждущие timeout
    
    def run_until_complete(self, coro):
        self.ready.append(coro)
        
        while self.ready or self.sleeping:
            # 1. Выполнить все готовые корутины
            while self.ready:
                coro = self.ready.pop(0)
                try:
                    coro.send(None)  # Запустить корутину
                except StopIteration as e:
                    return e.value  # Корутина завершена
            
            # 2. Ждать I/O или timeout
            timeout = min(self.sleeping.keys()) if self.sleeping else None
            time.sleep(timeout)

Асинхронное выполнение — это не многопоточность! Это кооперативная многозадачность, где каждая корутина добровольно отдаёт контроль через await.

Практический пример: конкурентные запросы

Неправильный подход (последовательно, медленно)

import asyncio
import time

async def fetch(url):
    await asyncio.sleep(1)  # Имитация сетевого запроса
    return f"Data from {url}"

async def main_slow():
    start = time.time()
    
    # Выполняется одна за другой (2 секунды)
    result1 = await fetch("url1")
    result2 = await fetch("url2")
    result3 = await fetch("url3")
    
    elapsed = time.time() - start
    print(f"Time: {elapsed:.1f}s")  # Output: Time: 3.0s
    return [result1, result2, result3]

asyncio.run(main_slow())

Правильный подход (конкурентно, быстро)

async def main_fast():
    start = time.time()
    
    # Создаём все задачи одновременно
    tasks = [
        asyncio.create_task(fetch("url1")),
        asyncio.create_task(fetch("url2")),
        asyncio.create_task(fetch("url3")),
    ]
    
    # Ждём, пока все завершатся (только 1 секунда!)
    results = await asyncio.gather(*tasks)
    
    elapsed = time.time() - start
    print(f"Time: {elapsed:.1f}s")  # Output: Time: 1.0s
    return results

asyncio.run(main_fast())

Разница: 3 секунды против 1 секунды!

Основные функции asyncio

asyncio.gather() — ждём все задачи

async def main():
    results = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3"),
        return_exceptions=True  # Не падаем, если одна ошибка
    )
    return results

asyncio.wait() — ждём с условиями

async def main():
    tasks = [
        asyncio.create_task(fetch("url1")),
        asyncio.create_task(fetch("url2")),
    ]
    
    # Ждём, пока хоть одна завершится
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Отменяем остальные
    for task in pending:
        task.cancel()

asyncio.create_task() — запланировать выполнение

async def main():
    # Задача запущена, но не ждём результат сразу
    task = asyncio.create_task(fetch("url1"))
    
    # Можем сделать что-то ещё
    print("Doing something else...")
    
    # Потом получить результат
    result = await task
    return result

asyncio.sleep() — асинхронная задержка

async def main():
    print("Start")
    await asyncio.sleep(1)  # Не блокирует другие корутины!
    print("After 1 second")

Реальный пример: HTTP запросы

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/google",
        "https://api.github.com/users/facebook",
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
    for result in results:
        print(f"User: {result['login']}")

asyncio.run(main())

Обработка ошибок

async def fetch_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            # Попытка выполнить операцию
            result = await fetch(url)
            return result
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

async def main():
    try:
        result = await fetch_with_retry("url")
    except Exception as e:
        print(f"Failed after retries: {e}")

asyncio.run(main())

Важные особенности

1. asyncio не многопоточность

# НЕПРАВИЛЬНО — CPU-bound операция заблокирует event loop
async def bad():
    time.sleep(1)  # Блокирует весь event loop!
    
# ПРАВИЛЬНО — используй await с I/O операциями
async def good():
    await asyncio.sleep(1)  # Не блокирует

2. Исключение из run_forever()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

async def task():
    await asyncio.sleep(1)
    print("Task done")

loop.create_task(task())
loop.run_forever()  # Запуск цикла
loop.close()

3. Отмена задач

async def main():
    task = asyncio.create_task(fetch("url"))
    
    await asyncio.sleep(0.5)
    task.cancel()  # Отменить задачу
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

Производительность

аsyncio идеален для I/O-bound задач:

# I/O-bound (asyncio идеален)
async def io_bound():
    async with aiohttp.ClientSession() as session:
        for _ in range(100):
            await fetch(session, "url")  # Сетевые запросы

# CPU-bound (используй multiprocessing или thread pool)
def cpu_bound():
    result = sum(range(10**8))  # Тяжёлые вычисления
    return result

# Комбинированный подход
async def mixed():
    # I/O в asyncio
    data = await fetch_data()
    
    # CPU в thread pool
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, cpu_intensive_func, data)
    return result

Вывод

asyncio — это мощный инструмент для конкурентного программирования в Python, идеально подходит для I/O-bound приложений (веб-серверы, API клиенты, боты). Благодаря event loop и кооперативной многозадачности, можно обрабатывать тысячи операций без затрат на переключение потоков.

Как работает asyncio в Python? | PrepBro