← Назад к вопросам
Зачем нужен асинхронный генератор в Python?
2.0 Middle🔥 81 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Зачем нужен асинхронный генератор в Python
Асинхронный генератор — это функция, которая может одновременно быть асинхронной (async) и генератором (yield). Это позволяет обрабатывать потоки данных асинхронно, без блокировки.
Определение
# Обычный генератор (синхронный)
def regular_generator():
yield 1
yield 2
yield 3
# Асинхронный генератор
async def async_generator():
yield 1
yield 2
yield 3
# Разница: async_generator может использовать await внутри
Главная причина: потоковая обработка I/O операций
1. Получение данных из API с ограничениями
# Плохо: получить все данные сразу (может быть миллион записей)
async def fetch_all_users():
users = await api.get("/users?limit=1000000")
# Это может быть 100MB в памяти!
return users
# Хорошо: получать страницы по мере необходимости
async def fetch_users_paginated():
page = 1
while True:
response = await api.get(f"/users?page={page}&limit=100")
if not response["data"]:
break
for user in response["data"]:
yield user # Отправляем пользователя обработчику
page += 1
# Использование
async for user in fetch_users_paginated():
await process_user(user) # Обработка по одному
# В памяти только 100 пользователей одновременно!
Сравнение памяти
Регулярный способ (все в памяти):
┌─────────────────────────────────────┐
│ Все 1,000,000 пользователей │ ← 100MB RAM
└─────────────────────────────────────┘
Асинхронный генератор (потоковая обработка):
┌───────────────────────┐
│ Текущая страница (100)│ ← 10KB RAM
└───────────────────────┘
Остальное на сервере
2. Чтение большого файла
# Синхронный способ (блокирует)
def read_file_sync(filepath):
with open(filepath) as f:
lines = f.readlines() # Весь файл в память!
for line in lines:
yield line.strip()
# Асинхронный генератор (неблокирующий)
async def read_file_async(filepath):
async with aiofiles.open(filepath) as f:
async for line in f:
yield line.strip() # По одной строке
# Использование
async for line in read_file_async("/var/log/huge_file.log"):
await process_line(line) # Не блокирует
Ключевые преимущества
1. Обработка больших данных без блокировки
import asyncio
# Асинхронный источник данных
async def data_source():
for i in range(1000000):
await asyncio.sleep(0.1) # Имитация I/O
yield {"id": i, "value": i ** 2}
# Обработчик
async def process_data():
count = 0
async for item in data_source():
# Обработка одного элемента
print(f"Item {item[id]}: {item[value]}")
count += 1
if count >= 10:
break
asyncio.run(process_data())
2. Композиция асинхронных источников
# Множество источников данных
async def api_stream_1():
"""Поток событий из API 1"""
for i in range(100):
await asyncio.sleep(0.5)
yield {"source": "api1", "value": i}
async def api_stream_2():
"""Поток событий из API 2"""
for i in range(100):
await asyncio.sleep(0.7)
yield {"source": "api2", "value": i}
# Объединение потоков
async def combined_stream():
"""Объединить оба потока в один"""
async for item in api_stream_1():
yield item
async for item in api_stream_2():
yield item
# Или параллельная обработка
async def parallel_streams():
"""Обрабатывать оба потока одновременно"""
# Создаём итераторы
stream1 = api_stream_1().__aiter__()
stream2 = api_stream_2().__aiter__()
while True:
try:
# Получаем из обоих потоков параллельно
item1, item2 = await asyncio.gather(
stream1.__anext__(),
stream2.__anext__()
)
yield item1
yield item2
except StopAsyncIteration:
break
3. Фильтрация и трансформация потоков
async def data_source():
for i in range(100):
await asyncio.sleep(0.1)
yield {"id": i, "value": i * 2}
# Фильтр
async def filter_even():
async for item in data_source():
if item["value"] % 4 == 0:
yield item # Только чётные значения
# Трансформация
async def transform_data():
async for item in filter_even():
yield {
"id": item["id"],
"squared": item["value"] ** 2
}
# Использование
async def main():
count = 0
async for item in transform_data():
print(item)
count += 1
if count >= 5:
break
asyncio.run(main())
Реальные примеры использования
1. WebSocket потоки сообщений
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI()
# Асинхронный генератор для обработки сообщений
async def receive_messages(websocket: WebSocket):
"""Получать сообщения из WebSocket"""
async for message in websocket.iter_text():
yield message # Отправляем сообщение дальше
async def process_messages(websocket: WebSocket):
"""Обработать сообщения"""
async for message in receive_messages(websocket):
# Процесс сообщения асинхронно
result = await process_message(message)
await websocket.send_json(result)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await process_messages(websocket)
2. Потоки данных в реальном времени
from kafka import KafkaConsumer
import json
import asyncio
# Асинхронный генератор из Kafka
async def kafka_stream(topic):
"""Читать события из Kafka асинхронно"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=[kafka:9092],
value_deserializer=lambda m: json.loads(m.decode(utf-8))
)
for message in consumer:
yield message.value
await asyncio.sleep(0) # Дать другим корутинам шанс
# Обработка потока
async def process_kafka_events():
async for event in kafka_stream(orders):
# Асинхронная обработка
await save_to_db(event)
await send_notification(event)
await update_cache(event)
asyncio.run(process_kafka_events())
3. Потоки данных из БД
import asyncpg
async def fetch_user_stream(pool, batch_size=100):
"""Асинхронный генератор для больших таблиц"""
conn = await pool.acquire()
try:
offset = 0
while True:
rows = await conn.fetch(
f"SELECT * FROM users OFFSET {offset} LIMIT {batch_size}"
)
if not rows:
break
for row in rows:
yield dict(row)
offset += batch_size
finally:
await pool.release(conn)
# Обработка огромной таблицы без загрузки всего в памяти
async def process_all_users():
pool = await asyncpg.create_pool(postgresql://...)
async for user in fetch_user_stream(pool):
await process_user(user)
# В памяти только 100 пользователей одновременно
await pool.close()
4. Batch обработка
async def batch_processor(stream, batch_size=10):
"""Группировать элементы потока в батчи"""
batch = []
async for item in stream:
batch.append(item)
if len(batch) >= batch_size:
yield batch # Выдать батч
batch = []
if batch:
yield batch # Остаток
async def api_stream():
for i in range(100):
yield {"id": i, "value": f"item_{i}"}
await asyncio.sleep(0.1)
async def main():
async for batch in batch_processor(api_stream(), batch_size=10):
# Отправить батч на обработку
await save_batch_to_db(batch)
print(f"Processed batch of {len(batch)} items")
asyncio.run(main())
Синтаксис асинхронного генератора
# Определение
async def my_async_gen():
yield value1
yield value2
# Использование
async for item in my_async_gen():
print(item)
# Явное получение значений
gen = my_async_gen()
item1 = await gen.__anext__()
item2 = await gen.__anext__()
try:
item3 = await gen.__anext__()
except StopAsyncIteration:
print("Generator is exhausted")
Сравнение с альтернативами
Асинхронный генератор vs Список
# Список (все в памяти)
async def get_users_list():
users = []
for i in range(1000000):
users.append({"id": i, "name": f"user_{i}"})
return users # Занимает 100MB!
# Асинхронный генератор (ленивый)
async def get_users_generator():
for i in range(1000000):
yield {"id": i, "name": f"user_{i}"} # 1KB за раз
Асинхронный генератор vs Callback
# Callback (низкоуровневый, сложно)
async def process_stream_callback(stream, callback):
async for item in stream:
await callback(item)
# Асинхронный генератор (высокоуровневый, понятно)
async def filter_stream(stream):
async for item in stream:
if item["type"] == "important":
yield item
Выводы
Асинхронный генератор нужен для:
- Потоковой обработки больших данных (без загрузки всего в памяти)
- I/O операций (API, файлы, БД, WebSocket)
- Реального времени систем (Kafka, WebSocket, SSE)
- Композиции потоков (объединение, фильтрация, трансформация)
- Обработки "бесконечных" потоков (живые события)
Основной выигрыш: эффективность памяти + неблокирующая обработка данных.
Правило: Когда обрабатываешь потоки данных асинхронно — используй асинхронный генератор. Это естественный и эффективный способ работать с потоками в Python.