← Назад к вопросам

Какие знаешь методы из asyncio?

2.0 Middle🔥 171 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Основные методы и функции asyncio

asyncio — модуль для асинхронного программирования. Расскажу про самые важные методы и когда их использовать.

1. asyncio.run()

Запуск асинхронной программы.

import asyncio

async def main():
    print("Hello, async world!")
    await asyncio.sleep(1)
    print("Done")

# Точка входа
if __name__ == "__main__":
    asyncio.run(main())

# asyncio.run() создает event loop, запускает корутину и закрывает loop

Важно: используй asyncio.run() только один раз в main. Не вызывай несколько раз.

2. asyncio.sleep()

Асинхронная задержка (не блокирует другие задачи).

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 done")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 done")

async def main():
    # Запустить обе задачи параллельно
    await asyncio.gather(task1(), task2())
    # Займет 2 секунды (не 3!), потому что execute параллельно

asyncio.run(main())

Против: time.sleep() — блокирует event loop!

# ❌ НЕПРАВИЛЬНО
async def bad():
    time.sleep(2)  # Блокирует всё!

# ✅ ПРАВИЛЬНО
async def good():
    await asyncio.sleep(2)  # Не блокирует

3. asyncio.gather()

Запустить несколько корутин одновременно и дождаться всех.

import asyncio

async def fetch_data(url, delay):
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    # Запустить все параллельно
    results = await asyncio.gather(
        fetch_data("api1.com", 1),
        fetch_data("api2.com", 2),
        fetch_data("api3.com", 1.5)
    )
    # Займет 2 секунды (максимум из 1, 2, 1.5)
    print(results)  # ['Data from api1.com', 'Data from api2.com', ...]

asyncio.run(main())

# С обработкой ошибок
results = await asyncio.gather(
    fetch_data("api1.com", 1),
    fetch_data("api2.com", 0),
    return_exceptions=True  # Ошибки не прерывают
)
print(results)  # [result, Exception(...), result]

4. asyncio.create_task()

Запланировать задачу на выполнение (не ждать сразу).

import asyncio

async def background_task():
    print("Background task started")
    await asyncio.sleep(2)
    print("Background task done")

async def main():
    # Запланировать задачу, но не ждать
    task = asyncio.create_task(background_task())
    
    print("Main continues...")
    await asyncio.sleep(1)
    print("Main still running")
    
    # Теперь дождаться
    await task
    print("Main done")

asyncio.run(main())

# Вывод:
# Background task started
# Main continues...
# Main still running
# Background task done
# Main done

Различие gather vs create_task:

# gather: создает и ждет
results = await asyncio.gather(task1(), task2(), task3())

# create_task: создает, продолжает работу, потом ждешь
task1_obj = asyncio.create_task(task1())
task2_obj = asyncio.create_task(task2())
# Можно сделать что-то другое
await task1_obj
await task2_obj

5. asyncio.wait()

Ждать выполнения нескольких задач с условиями (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED).

import asyncio

async def task(n, delay):
    await asyncio.sleep(delay)
    return f"Task {n} done"

async def main():
    tasks = [
        asyncio.create_task(task(1, 1)),
        asyncio.create_task(task(2, 2)),
        asyncio.create_task(task(3, 0.5))
    ]
    
    # ✅ Дождаться первого завершения
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    print(f"First done: {done}")
    print(f"Still pending: {pending}")
    
    # ✅ Дождаться всех (по умолчанию)
    done, pending = await asyncio.wait(tasks)  # return_when=ALL_COMPLETED
    print(f"All done: {done}")

asyncio.run(main())

Варианты return_when:

  • FIRST_COMPLETED — вернуться как только одна готова
  • FIRST_EXCEPTION — вернуться при первой ошибке
  • ALL_COMPLETED — дождаться всех (по умолчанию)

6. asyncio.as_completed()

Обработать результаты по мере их готовности.

import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    tasks = [
        fetch("api1.com", 3),
        fetch("api2.com", 1),
        fetch("api3.com", 2)
    ]
    
    # Обработать по мере готовности
    for future in asyncio.as_completed(tasks):
        result = await future
        print(f"Got: {result}")  # api2, api3, api1 (в порядке готовности)

asyncio.run(main())

7. asyncio.Queue()

Асинхронная очередь для передачи данных между задачами.

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(f"Item {i}")
        print(f"Produced item {i}")
        await asyncio.sleep(0.5)
    await queue.put(None)  # Сигнал конца

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=2)  # Максимум 2 элемента
    
    await asyncio.gather(
        producer(queue),
        consumer(queue)
    )

asyncio.run(main())

8. asyncio.Lock() и asyncio.Semaphore()

Синхронизация доступа к ресурсам.

import asyncio

# Lock: только одна задача за раз
lock = asyncio.Lock()

async def critical_section():
    async with lock:
        print("Entering critical section")
        await asyncio.sleep(1)
        print("Leaving critical section")

# Semaphore: несколько задач за раз (до N)
semaphore = asyncio.Semaphore(3)  # Максимум 3 одновременно

async def limited_resource():
    async with semaphore:
        print("Using resource")
        await asyncio.sleep(1)

async def main():
    tasks = [critical_section() for _ in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

9. asyncio.Event()

Сигнал для синхронизации между задачами.

import asyncio

event = asyncio.Event()

async def waiter():
    print("Waiting for event...")
    await event.wait()  # Ждем
    print("Event received!")

async def setter():
    await asyncio.sleep(2)
    print("Setting event...")
    event.set()  # Сигнализируем

async def main():
    await asyncio.gather(
        waiter(),
        waiter(),
        setter()
    )

asyncio.run(main())

10. asyncio.Condition()

Условная переменная (Lock + Event).

import asyncio

condition = asyncio.Condition()
data = None

async def producer():
    global data
    for i in range(5):
        await asyncio.sleep(1)
        async with condition:
            data = f"Item {i}"
            print(f"Produced: {data}")
            condition.notify_all()  # Сигнализировать потребителям

async def consumer(name):
    global data
    while True:
        async with condition:
            await condition.wait()  # Ждать сигнала
            print(f"Consumer {name} got: {data}")

async def main():
    await asyncio.gather(
        producer(),
        consumer(1),
        consumer(2)
    )

asyncio.run(main())

11. asyncio.timeout()

Установить timeout для задачи (Python 3.11+).

import asyncio

async def slow_task():
    await asyncio.sleep(5)
    return "Done"

async def main():
    try:
        async with asyncio.timeout(2):  # 2 секунды
            result = await slow_task()
    except asyncio.TimeoutError:
        print("Task timeout!")

asyncio.run(main())

# Для Python < 3.11
try:
    result = await asyncio.wait_for(slow_task(), timeout=2)
except asyncio.TimeoutError:
    print("Task timeout!")

12. asyncio.get_event_loop() и asyncio.get_running_loop()

Получить текущий event loop.

import asyncio

async def task():
    # Внутри async функции
    loop = asyncio.get_running_loop()  # Текущий loop
    print(f"Running loop: {loop}")
    
    # Запланировать функцию
    loop.call_soon(print, "Soon")

asyncio.run(task())

Сравнение методов

МетодНазначениеРезультат
gather()Запустить несколько и дождатьсяСписок результатов
create_task()Запланировать задачуTask объект
wait()Ждать с условиямиdone/pending sets
as_completed()Обработать по готовностиIterator futures
Queue()Передача данныхFIFO очередь
Lock()Критическая секцияContext manager
Semaphore()Ограничение параллелизмаContext manager
Event()Сигнал между задачамиwait()/set()
Condition()Условная переменнаяwait()/notify()

Практический пример: веб-краулер

import asyncio
import aiohttp
from asyncio import Semaphore, Queue

class WebCrawler:
    def __init__(self, max_workers=10):
        self.semaphore = Semaphore(max_workers)
        self.queue = Queue()
        self.visited = set()
    
    async def fetch(self, session, url):
        async with self.semaphore:
            try:
                async with session.get(url, timeout=5) as resp:
                    return await resp.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
    
    async def process_urls(self, urls):
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return results

async def main():
    crawler = WebCrawler(max_workers=5)
    urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]
    results = await crawler.process_urls(urls)
    print(f"Fetched {len(results)} pages")

asyncio.run(main())

Золотые правила asyncio

  1. Используй asyncio.run() в main — один раз
  2. Никогда не блокируй event loop (time.sleep, синхронные I/O)
  3. Используй await asyncio.sleep() для задержек
  4. gather() для параллельных задач, wait() для условного выполнения
  5. Queue для коммуникации между задачами
  6. Lock/Semaphore для защиты общего состояния
  7. Тестируй с pytest-asyncio (asyncio тесты требуют специального подхода)

asyncio — мощный инструмент, но требует изменения мышления. Учись на примерах!

Какие знаешь методы из asyncio? | PrepBro