Какие знаешь методы из asyncio?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Основные методы и функции asyncio
asyncio — модуль для асинхронного программирования. Расскажу про самые важные методы и когда их использовать.
1. asyncio.run()
Запуск асинхронной программы.
import asyncio
async def main():
print("Hello, async world!")
await asyncio.sleep(1)
print("Done")
# Точка входа
if __name__ == "__main__":
asyncio.run(main())
# asyncio.run() создает event loop, запускает корутину и закрывает loop
Важно: используй asyncio.run() только один раз в main. Не вызывай несколько раз.
2. asyncio.sleep()
Асинхронная задержка (не блокирует другие задачи).
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 done")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 done")
async def main():
# Запустить обе задачи параллельно
await asyncio.gather(task1(), task2())
# Займет 2 секунды (не 3!), потому что execute параллельно
asyncio.run(main())
Против: time.sleep() — блокирует event loop!
# ❌ НЕПРАВИЛЬНО
async def bad():
time.sleep(2) # Блокирует всё!
# ✅ ПРАВИЛЬНО
async def good():
await asyncio.sleep(2) # Не блокирует
3. asyncio.gather()
Запустить несколько корутин одновременно и дождаться всех.
import asyncio
async def fetch_data(url, delay):
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
# Запустить все параллельно
results = await asyncio.gather(
fetch_data("api1.com", 1),
fetch_data("api2.com", 2),
fetch_data("api3.com", 1.5)
)
# Займет 2 секунды (максимум из 1, 2, 1.5)
print(results) # ['Data from api1.com', 'Data from api2.com', ...]
asyncio.run(main())
# С обработкой ошибок
results = await asyncio.gather(
fetch_data("api1.com", 1),
fetch_data("api2.com", 0),
return_exceptions=True # Ошибки не прерывают
)
print(results) # [result, Exception(...), result]
4. asyncio.create_task()
Запланировать задачу на выполнение (не ждать сразу).
import asyncio
async def background_task():
print("Background task started")
await asyncio.sleep(2)
print("Background task done")
async def main():
# Запланировать задачу, но не ждать
task = asyncio.create_task(background_task())
print("Main continues...")
await asyncio.sleep(1)
print("Main still running")
# Теперь дождаться
await task
print("Main done")
asyncio.run(main())
# Вывод:
# Background task started
# Main continues...
# Main still running
# Background task done
# Main done
Различие gather vs create_task:
# gather: создает и ждет
results = await asyncio.gather(task1(), task2(), task3())
# create_task: создает, продолжает работу, потом ждешь
task1_obj = asyncio.create_task(task1())
task2_obj = asyncio.create_task(task2())
# Можно сделать что-то другое
await task1_obj
await task2_obj
5. asyncio.wait()
Ждать выполнения нескольких задач с условиями (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED).
import asyncio
async def task(n, delay):
await asyncio.sleep(delay)
return f"Task {n} done"
async def main():
tasks = [
asyncio.create_task(task(1, 1)),
asyncio.create_task(task(2, 2)),
asyncio.create_task(task(3, 0.5))
]
# ✅ Дождаться первого завершения
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First done: {done}")
print(f"Still pending: {pending}")
# ✅ Дождаться всех (по умолчанию)
done, pending = await asyncio.wait(tasks) # return_when=ALL_COMPLETED
print(f"All done: {done}")
asyncio.run(main())
Варианты return_when:
FIRST_COMPLETED— вернуться как только одна готоваFIRST_EXCEPTION— вернуться при первой ошибкеALL_COMPLETED— дождаться всех (по умолчанию)
6. asyncio.as_completed()
Обработать результаты по мере их готовности.
import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
tasks = [
fetch("api1.com", 3),
fetch("api2.com", 1),
fetch("api3.com", 2)
]
# Обработать по мере готовности
for future in asyncio.as_completed(tasks):
result = await future
print(f"Got: {result}") # api2, api3, api1 (в порядке готовности)
asyncio.run(main())
7. asyncio.Queue()
Асинхронная очередь для передачи данных между задачами.
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(f"Item {i}")
print(f"Produced item {i}")
await asyncio.sleep(0.5)
await queue.put(None) # Сигнал конца
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=2) # Максимум 2 элемента
await asyncio.gather(
producer(queue),
consumer(queue)
)
asyncio.run(main())
8. asyncio.Lock() и asyncio.Semaphore()
Синхронизация доступа к ресурсам.
import asyncio
# Lock: только одна задача за раз
lock = asyncio.Lock()
async def critical_section():
async with lock:
print("Entering critical section")
await asyncio.sleep(1)
print("Leaving critical section")
# Semaphore: несколько задач за раз (до N)
semaphore = asyncio.Semaphore(3) # Максимум 3 одновременно
async def limited_resource():
async with semaphore:
print("Using resource")
await asyncio.sleep(1)
async def main():
tasks = [critical_section() for _ in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
9. asyncio.Event()
Сигнал для синхронизации между задачами.
import asyncio
event = asyncio.Event()
async def waiter():
print("Waiting for event...")
await event.wait() # Ждем
print("Event received!")
async def setter():
await asyncio.sleep(2)
print("Setting event...")
event.set() # Сигнализируем
async def main():
await asyncio.gather(
waiter(),
waiter(),
setter()
)
asyncio.run(main())
10. asyncio.Condition()
Условная переменная (Lock + Event).
import asyncio
condition = asyncio.Condition()
data = None
async def producer():
global data
for i in range(5):
await asyncio.sleep(1)
async with condition:
data = f"Item {i}"
print(f"Produced: {data}")
condition.notify_all() # Сигнализировать потребителям
async def consumer(name):
global data
while True:
async with condition:
await condition.wait() # Ждать сигнала
print(f"Consumer {name} got: {data}")
async def main():
await asyncio.gather(
producer(),
consumer(1),
consumer(2)
)
asyncio.run(main())
11. asyncio.timeout()
Установить timeout для задачи (Python 3.11+).
import asyncio
async def slow_task():
await asyncio.sleep(5)
return "Done"
async def main():
try:
async with asyncio.timeout(2): # 2 секунды
result = await slow_task()
except asyncio.TimeoutError:
print("Task timeout!")
asyncio.run(main())
# Для Python < 3.11
try:
result = await asyncio.wait_for(slow_task(), timeout=2)
except asyncio.TimeoutError:
print("Task timeout!")
12. asyncio.get_event_loop() и asyncio.get_running_loop()
Получить текущий event loop.
import asyncio
async def task():
# Внутри async функции
loop = asyncio.get_running_loop() # Текущий loop
print(f"Running loop: {loop}")
# Запланировать функцию
loop.call_soon(print, "Soon")
asyncio.run(task())
Сравнение методов
| Метод | Назначение | Результат |
|---|---|---|
| gather() | Запустить несколько и дождаться | Список результатов |
| create_task() | Запланировать задачу | Task объект |
| wait() | Ждать с условиями | done/pending sets |
| as_completed() | Обработать по готовности | Iterator futures |
| Queue() | Передача данных | FIFO очередь |
| Lock() | Критическая секция | Context manager |
| Semaphore() | Ограничение параллелизма | Context manager |
| Event() | Сигнал между задачами | wait()/set() |
| Condition() | Условная переменная | wait()/notify() |
Практический пример: веб-краулер
import asyncio
import aiohttp
from asyncio import Semaphore, Queue
class WebCrawler:
def __init__(self, max_workers=10):
self.semaphore = Semaphore(max_workers)
self.queue = Queue()
self.visited = set()
async def fetch(self, session, url):
async with self.semaphore:
try:
async with session.get(url, timeout=5) as resp:
return await resp.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def process_urls(self, urls):
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
crawler = WebCrawler(max_workers=5)
urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]
results = await crawler.process_urls(urls)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
Золотые правила asyncio
- Используй asyncio.run() в main — один раз
- Никогда не блокируй event loop (time.sleep, синхронные I/O)
- Используй await asyncio.sleep() для задержек
- gather() для параллельных задач, wait() для условного выполнения
- Queue для коммуникации между задачами
- Lock/Semaphore для защиты общего состояния
- Тестируй с pytest-asyncio (asyncio тесты требуют специального подхода)
asyncio — мощный инструмент, но требует изменения мышления. Учись на примерах!