← Назад к вопросам

Зачем нужен асинхронный генератор в Python?

2.0 Middle🔥 81 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Зачем нужен асинхронный генератор в Python

Асинхронный генератор — это функция, которая может одновременно быть асинхронной (async) и генератором (yield). Это позволяет обрабатывать потоки данных асинхронно, без блокировки.

Определение

# Обычный генератор (синхронный)
def regular_generator():
    yield 1
    yield 2
    yield 3

# Асинхронный генератор
async def async_generator():
    yield 1
    yield 2
    yield 3

# Разница: async_generator может использовать await внутри

Главная причина: потоковая обработка I/O операций

1. Получение данных из API с ограничениями

# Плохо: получить все данные сразу (может быть миллион записей)
async def fetch_all_users():
    users = await api.get("/users?limit=1000000")
    # Это может быть 100MB в памяти!
    return users

# Хорошо: получать страницы по мере необходимости
async def fetch_users_paginated():
    page = 1
    while True:
        response = await api.get(f"/users?page={page}&limit=100")
        
        if not response["data"]:
            break
        
        for user in response["data"]:
            yield user  # Отправляем пользователя обработчику
        
        page += 1

# Использование
async for user in fetch_users_paginated():
    await process_user(user)  # Обработка по одному
    # В памяти только 100 пользователей одновременно!

Сравнение памяти

Регулярный способ (все в памяти):
┌─────────────────────────────────────┐
│ Все 1,000,000 пользователей        │ ← 100MB RAM
└─────────────────────────────────────┘

Асинхронный генератор (потоковая обработка):
┌───────────────────────┐
│ Текущая страница (100)│ ← 10KB RAM
└───────────────────────┘
Остальное на сервере

2. Чтение большого файла

# Синхронный способ (блокирует)
def read_file_sync(filepath):
    with open(filepath) as f:
        lines = f.readlines()  # Весь файл в память!
        for line in lines:
            yield line.strip()

# Асинхронный генератор (неблокирующий)
async def read_file_async(filepath):
    async with aiofiles.open(filepath) as f:
        async for line in f:
            yield line.strip()  # По одной строке

# Использование
async for line in read_file_async("/var/log/huge_file.log"):
    await process_line(line)  # Не блокирует

Ключевые преимущества

1. Обработка больших данных без блокировки

import asyncio

# Асинхронный источник данных
async def data_source():
    for i in range(1000000):
        await asyncio.sleep(0.1)  # Имитация I/O
        yield {"id": i, "value": i ** 2}

# Обработчик
async def process_data():
    count = 0
    async for item in data_source():
        # Обработка одного элемента
        print(f"Item {item[id]}: {item[value]}")
        count += 1
        
        if count >= 10:
            break

asyncio.run(process_data())

2. Композиция асинхронных источников

# Множество источников данных
async def api_stream_1():
    """Поток событий из API 1"""
    for i in range(100):
        await asyncio.sleep(0.5)
        yield {"source": "api1", "value": i}

async def api_stream_2():
    """Поток событий из API 2"""
    for i in range(100):
        await asyncio.sleep(0.7)
        yield {"source": "api2", "value": i}

# Объединение потоков
async def combined_stream():
    """Объединить оба потока в один"""
    async for item in api_stream_1():
        yield item
    async for item in api_stream_2():
        yield item

# Или параллельная обработка
async def parallel_streams():
    """Обрабатывать оба потока одновременно"""
    # Создаём итераторы
    stream1 = api_stream_1().__aiter__()
    stream2 = api_stream_2().__aiter__()
    
    while True:
        try:
            # Получаем из обоих потоков параллельно
            item1, item2 = await asyncio.gather(
                stream1.__anext__(),
                stream2.__anext__()
            )
            yield item1
            yield item2
        except StopAsyncIteration:
            break

3. Фильтрация и трансформация потоков

async def data_source():
    for i in range(100):
        await asyncio.sleep(0.1)
        yield {"id": i, "value": i * 2}

# Фильтр
async def filter_even():
    async for item in data_source():
        if item["value"] % 4 == 0:
            yield item  # Только чётные значения

# Трансформация
async def transform_data():
    async for item in filter_even():
        yield {
            "id": item["id"],
            "squared": item["value"] ** 2
        }

# Использование
async def main():
    count = 0
    async for item in transform_data():
        print(item)
        count += 1
        if count >= 5:
            break

asyncio.run(main())

Реальные примеры использования

1. WebSocket потоки сообщений

from fastapi import FastAPI, WebSocket
import asyncio

app = FastAPI()

# Асинхронный генератор для обработки сообщений
async def receive_messages(websocket: WebSocket):
    """Получать сообщения из WebSocket"""
    async for message in websocket.iter_text():
        yield message  # Отправляем сообщение дальше

async def process_messages(websocket: WebSocket):
    """Обработать сообщения"""
    async for message in receive_messages(websocket):
        # Процесс сообщения асинхронно
        result = await process_message(message)
        await websocket.send_json(result)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await process_messages(websocket)

2. Потоки данных в реальном времени

from kafka import KafkaConsumer
import json
import asyncio

# Асинхронный генератор из Kafka
async def kafka_stream(topic):
    """Читать события из Kafka асинхронно"""
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[kafka:9092],
        value_deserializer=lambda m: json.loads(m.decode(utf-8))
    )
    
    for message in consumer:
        yield message.value
        await asyncio.sleep(0)  # Дать другим корутинам шанс

# Обработка потока
async def process_kafka_events():
    async for event in kafka_stream(orders):
        # Асинхронная обработка
        await save_to_db(event)
        await send_notification(event)
        await update_cache(event)

asyncio.run(process_kafka_events())

3. Потоки данных из БД

import asyncpg

async def fetch_user_stream(pool, batch_size=100):
    """Асинхронный генератор для больших таблиц"""
    conn = await pool.acquire()
    try:
        offset = 0
        while True:
            rows = await conn.fetch(
                f"SELECT * FROM users OFFSET {offset} LIMIT {batch_size}"
            )
            
            if not rows:
                break
            
            for row in rows:
                yield dict(row)
            
            offset += batch_size
    finally:
        await pool.release(conn)

# Обработка огромной таблицы без загрузки всего в памяти
async def process_all_users():
    pool = await asyncpg.create_pool(postgresql://...)
    
    async for user in fetch_user_stream(pool):
        await process_user(user)
        # В памяти только 100 пользователей одновременно
    
    await pool.close()

4. Batch обработка

async def batch_processor(stream, batch_size=10):
    """Группировать элементы потока в батчи"""
    batch = []
    async for item in stream:
        batch.append(item)
        if len(batch) >= batch_size:
            yield batch  # Выдать батч
            batch = []
    if batch:
        yield batch  # Остаток

async def api_stream():
    for i in range(100):
        yield {"id": i, "value": f"item_{i}"}
        await asyncio.sleep(0.1)

async def main():
    async for batch in batch_processor(api_stream(), batch_size=10):
        # Отправить батч на обработку
        await save_batch_to_db(batch)
        print(f"Processed batch of {len(batch)} items")

asyncio.run(main())

Синтаксис асинхронного генератора

# Определение
async def my_async_gen():
    yield value1
    yield value2

# Использование
async for item in my_async_gen():
    print(item)

# Явное получение значений
gen = my_async_gen()
item1 = await gen.__anext__()
item2 = await gen.__anext__()

try:
    item3 = await gen.__anext__()
except StopAsyncIteration:
    print("Generator is exhausted")

Сравнение с альтернативами

Асинхронный генератор vs Список

# Список (все в памяти)
async def get_users_list():
    users = []
    for i in range(1000000):
        users.append({"id": i, "name": f"user_{i}"})
    return users  # Занимает 100MB!

# Асинхронный генератор (ленивый)
async def get_users_generator():
    for i in range(1000000):
        yield {"id": i, "name": f"user_{i}"}  # 1KB за раз

Асинхронный генератор vs Callback

# Callback (низкоуровневый, сложно)
async def process_stream_callback(stream, callback):
    async for item in stream:
        await callback(item)

# Асинхронный генератор (высокоуровневый, понятно)
async def filter_stream(stream):
    async for item in stream:
        if item["type"] == "important":
            yield item

Выводы

Асинхронный генератор нужен для:

  1. Потоковой обработки больших данных (без загрузки всего в памяти)
  2. I/O операций (API, файлы, БД, WebSocket)
  3. Реального времени систем (Kafka, WebSocket, SSE)
  4. Композиции потоков (объединение, фильтрация, трансформация)
  5. Обработки "бесконечных" потоков (живые события)

Основной выигрыш: эффективность памяти + неблокирующая обработка данных.

Правило: Когда обрабатываешь потоки данных асинхронно — используй асинхронный генератор. Это естественный и эффективный способ работать с потоками в Python.