В каких ситуациях нужен асинхронный генератор
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# В каких ситуациях нужен асинхронный генератор
Асинхронные генераторы (async generators) - это мощный инструмент для работы с потоками асинхронных данных. Давайте разберем, когда и почему их нужно использовать.
Что такое асинхронный генератор?
Это функция, которая использует async def и содержит yield. Она позволяет генерировать значения асинхронно:
# Обычный генератор
def sync_generator():
yield 1
yield 2
yield 3
# Асинхронный генератор
async def async_generator():
yield 1
yield 2
yield 3
# Использование асинхронного генератора
async for value in async_generator():
print(value)
Основные случаи использования
1. Обработка потока данных с асинхронными операциями
Когда нужно получить данные из нескольких асинхронных источников без загрузки всего в память:
import aiohttp
import asyncio
async def fetch_user_data(user_ids):
async with aiohttp.ClientSession() as session:
for user_id in user_ids:
url = f"https://api.example.com/users/{user_id}"
async with session.get(url) as response:
data = await response.json()
yield data # Возвращаем данные по одному
# Использование
async def process_users():
user_ids = [1, 2, 3, 4, 5]
async for user in fetch_user_data(user_ids):
print(f"Processing user: {user["name"]}")
# Обрабатываем каждого пользователя по мере получения
asyncio.run(process_users())
Преимущество: мы не ждем, пока загрузятся ВСЕ пользователи, обрабатываем их по мере получения.
2. Потоковая обработка больших файлов
Читаем большой файл асинхронно, возвращаем части постепенно:
async def read_large_file(file_path, chunk_size=1024):
import aiofiles
async with aiofiles.open(file_path, "r") as file:
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
yield chunk # Возвращаем по частям
# Использование
async def process_file():
async for chunk in read_large_file("large_file.txt"):
print(f"Processing chunk of {len(chunk)} bytes")
# Обрабатываем файл по частям, без загрузки в память
asyncio.run(process_file())
Преимущество: большой файл не загружается полностью в память, обрабатывается по частям.
3. Мониторинг и подписки в реальном времени
Получение обновлений с асинхронного источника (WebSocket, очередь, реал-тайм БД):
import asyncio
async def monitor_queue(queue):
"""Генератор для мониторинга асинхронной очереди"""
while True:
try:
message = await asyncio.wait_for(queue.get(), timeout=5.0)
yield message
except asyncio.TimeoutError:
yield None # Timeout - нет новых сообщений
# Использование с WebSocket
import websockets
async def websocket_stream(uri):
async with websockets.connect(uri) as websocket:
while True:
message = await websocket.recv()
yield message
async def listen_websocket():
async for message in websocket_stream("wss://example.com/stream"):
if message:
print(f"Received: {message}")
asyncio.run(listen_websocket())
Преимущество: слушаем события по мере их поступления, не блокируя программу.
4. Пагинация с асинхронными запросами
Получение данных со страниц API асинхронно:
async def paginated_api_call(base_url, total_items=None):
"""Асинхронный генератор для пагинированного API"""
page = 1
items_fetched = 0
while True:
url = f"{base_url}?page={page}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
if not data["items"]:
break # Нет больше данных
for item in data["items"]:
yield item
items_fetched += 1
if total_items and items_fetched >= total_items:
return # Достаточно
page += 1
# Использование
async def fetch_all_products():
async for product in paginated_api_call(
"https://api.example.com/products",
total_items=100
):
print(f"Product: {product["name"]}")
asyncio.run(fetch_all_products())
Преимущество: не нужно знать общее количество страниц, работаем по мере получения данных.
5. Параллельная обработка с лимитом одновременных операций
import asyncio
from typing import AsyncGenerator
async def limited_concurrent_tasks(
tasks,
max_concurrent=5
) -> AsyncGenerator:
"""Выполняет задачи с лимитом одновременных операций"""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_task(task):
async with semaphore:
result = await task()
yield result
pending = [bounded_task(task) for task in tasks]
for future in asyncio.as_completed(pending):
yield await future
# Использование
async def fetch_urls(urls):
async def fetch_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async for result in limited_concurrent_tasks(
[lambda url=url: fetch_one(url) for url in urls],
max_concurrent=5
):
print(f"Got result: {result}")
asyncio.run(fetch_urls(["url1", "url2", "url3"]))
Преимущество: контролируем количество одновременных подключений.
6. Трансформация потока данных
Преобразование данных с асинхронными операциями:
async def transform_data_stream(data_source):
"""Трансформирует поток данных"""
async for item in data_source:
# Асинхронная обработка каждого item
processed = await process_item(item)
if processed:
yield processed
async def process_item(item):
# Например, запрос к БД
await asyncio.sleep(0.1) # Имитация БД запроса
return item.upper()
async def example():
async def simple_source():
for item in ["hello", "world", "async"]:
yield item
async for transformed in transform_data_stream(simple_source()):
print(f"Transformed: {transformed}")
asyncio.run(example())
Асинхронные генераторы vs обычные функции, возвращающие список
Плохо - загружает все в память
async def fetch_all_users():
"""Загружает всех пользователей в памяти"""
users = []
async with aiohttp.ClientSession() as session:
for i in range(10000):
response = await session.get(f"https://api.example.com/user/{i}")
users.append(await response.json())
return users # Все в памяти!
# Использование - ждем, пока загрузятся ВСЕ
all_users = await fetch_all_users()
for user in all_users:
print(user)
Хорошо - обрабатывает по мере получения
async def fetch_users_stream():
"""Генерирует пользователей по мере получения"""
async with aiohttp.ClientSession() as session:
for i in range(10000):
response = await session.get(f"https://api.example.com/user/{i}")
yield await response.json() # Один за другим
# Использование - обрабатываем по мере получения
async for user in fetch_users_stream():
print(user) # Не ждем загрузки всех
Вывод
Асинхронные генераторы нужны когда:
- Потоковые данные - источник данных производит значения асинхронно
- Экономия памяти - большой объем данных нужно обрабатывать по частям
- Реал-тайм обновления - слушаем события, WebSocket, очереди
- Пагинация - не знаем заранее все данные
- Параллельная обработка - контролируем количество одновременных операций
- Трансформация потоков - нужна асинхронная обработка каждого элемента
Основная идея: используйте асинхронные генераторы вместо загрузки всех данных в память и последующей обработки.