← Назад к вопросам

В каких ситуациях нужен асинхронный генератор

3.0 Senior🔥 61 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# В каких ситуациях нужен асинхронный генератор

Асинхронные генераторы (async generators) - это мощный инструмент для работы с потоками асинхронных данных. Давайте разберем, когда и почему их нужно использовать.

Что такое асинхронный генератор?

Это функция, которая использует async def и содержит yield. Она позволяет генерировать значения асинхронно:

# Обычный генератор
def sync_generator():
    yield 1
    yield 2
    yield 3

# Асинхронный генератор
async def async_generator():
    yield 1
    yield 2
    yield 3

# Использование асинхронного генератора
async for value in async_generator():
    print(value)

Основные случаи использования

1. Обработка потока данных с асинхронными операциями

Когда нужно получить данные из нескольких асинхронных источников без загрузки всего в память:

import aiohttp
import asyncio

async def fetch_user_data(user_ids):
    async with aiohttp.ClientSession() as session:
        for user_id in user_ids:
            url = f"https://api.example.com/users/{user_id}"
            async with session.get(url) as response:
                data = await response.json()
                yield data  # Возвращаем данные по одному

# Использование
async def process_users():
    user_ids = [1, 2, 3, 4, 5]
    async for user in fetch_user_data(user_ids):
        print(f"Processing user: {user["name"]}")
        # Обрабатываем каждого пользователя по мере получения

asyncio.run(process_users())

Преимущество: мы не ждем, пока загрузятся ВСЕ пользователи, обрабатываем их по мере получения.

2. Потоковая обработка больших файлов

Читаем большой файл асинхронно, возвращаем части постепенно:

async def read_large_file(file_path, chunk_size=1024):
    import aiofiles
    
    async with aiofiles.open(file_path, "r") as file:
        while True:
            chunk = await file.read(chunk_size)
            if not chunk:
                break
            yield chunk  # Возвращаем по частям

# Использование
async def process_file():
    async for chunk in read_large_file("large_file.txt"):
        print(f"Processing chunk of {len(chunk)} bytes")
        # Обрабатываем файл по частям, без загрузки в память

asyncio.run(process_file())

Преимущество: большой файл не загружается полностью в память, обрабатывается по частям.

3. Мониторинг и подписки в реальном времени

Получение обновлений с асинхронного источника (WebSocket, очередь, реал-тайм БД):

import asyncio

async def monitor_queue(queue):
    """Генератор для мониторинга асинхронной очереди"""
    while True:
        try:
            message = await asyncio.wait_for(queue.get(), timeout=5.0)
            yield message
        except asyncio.TimeoutError:
            yield None  # Timeout - нет новых сообщений

# Использование с WebSocket
import websockets

async def websocket_stream(uri):
    async with websockets.connect(uri) as websocket:
        while True:
            message = await websocket.recv()
            yield message

async def listen_websocket():
    async for message in websocket_stream("wss://example.com/stream"):
        if message:
            print(f"Received: {message}")

asyncio.run(listen_websocket())

Преимущество: слушаем события по мере их поступления, не блокируя программу.

4. Пагинация с асинхронными запросами

Получение данных со страниц API асинхронно:

async def paginated_api_call(base_url, total_items=None):
    """Асинхронный генератор для пагинированного API"""
    page = 1
    items_fetched = 0
    
    while True:
        url = f"{base_url}?page={page}"
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                data = await response.json()
                
                if not data["items"]:
                    break  # Нет больше данных
                
                for item in data["items"]:
                    yield item
                    items_fetched += 1
                    
                    if total_items and items_fetched >= total_items:
                        return  # Достаточно
                
                page += 1

# Использование
async def fetch_all_products():
    async for product in paginated_api_call(
        "https://api.example.com/products",
        total_items=100
    ):
        print(f"Product: {product["name"]}")

asyncio.run(fetch_all_products())

Преимущество: не нужно знать общее количество страниц, работаем по мере получения данных.

5. Параллельная обработка с лимитом одновременных операций

import asyncio
from typing import AsyncGenerator

async def limited_concurrent_tasks(
    tasks,
    max_concurrent=5
) -> AsyncGenerator:
    """Выполняет задачи с лимитом одновременных операций"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def bounded_task(task):
        async with semaphore:
            result = await task()
            yield result
    
    pending = [bounded_task(task) for task in tasks]
    
    for future in asyncio.as_completed(pending):
        yield await future

# Использование
async def fetch_urls(urls):
    async def fetch_one(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()
    
    async for result in limited_concurrent_tasks(
        [lambda url=url: fetch_one(url) for url in urls],
        max_concurrent=5
    ):
        print(f"Got result: {result}")

asyncio.run(fetch_urls(["url1", "url2", "url3"]))

Преимущество: контролируем количество одновременных подключений.

6. Трансформация потока данных

Преобразование данных с асинхронными операциями:

async def transform_data_stream(data_source):
    """Трансформирует поток данных"""
    async for item in data_source:
        # Асинхронная обработка каждого item
        processed = await process_item(item)
        if processed:
            yield processed

async def process_item(item):
    # Например, запрос к БД
    await asyncio.sleep(0.1)  # Имитация БД запроса
    return item.upper()

async def example():
    async def simple_source():
        for item in ["hello", "world", "async"]:
            yield item
    
    async for transformed in transform_data_stream(simple_source()):
        print(f"Transformed: {transformed}")

asyncio.run(example())

Асинхронные генераторы vs обычные функции, возвращающие список

Плохо - загружает все в память

async def fetch_all_users():
    """Загружает всех пользователей в памяти"""
    users = []
    async with aiohttp.ClientSession() as session:
        for i in range(10000):
            response = await session.get(f"https://api.example.com/user/{i}")
            users.append(await response.json())
    return users  # Все в памяти!

# Использование - ждем, пока загрузятся ВСЕ
all_users = await fetch_all_users()
for user in all_users:
    print(user)

Хорошо - обрабатывает по мере получения

async def fetch_users_stream():
    """Генерирует пользователей по мере получения"""
    async with aiohttp.ClientSession() as session:
        for i in range(10000):
            response = await session.get(f"https://api.example.com/user/{i}")
            yield await response.json()  # Один за другим

# Использование - обрабатываем по мере получения
async for user in fetch_users_stream():
    print(user)  # Не ждем загрузки всех

Вывод

Асинхронные генераторы нужны когда:

  1. Потоковые данные - источник данных производит значения асинхронно
  2. Экономия памяти - большой объем данных нужно обрабатывать по частям
  3. Реал-тайм обновления - слушаем события, WebSocket, очереди
  4. Пагинация - не знаем заранее все данные
  5. Параллельная обработка - контролируем количество одновременных операций
  6. Трансформация потоков - нужна асинхронная обработка каждого элемента

Основная идея: используйте асинхронные генераторы вместо загрузки всех данных в память и последующей обработки.

В каких ситуациях нужен асинхронный генератор | PrepBro