← Назад к вопросам

Как затрачивается память в асинхронности?

1.7 Middle🔥 91 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Затраты памяти в асинхронности: от корутин к утечкам

Асинхронность даёт производительность, но требует понимания как и когда память выделяется. Наивное использование async может привести к утечкам памяти, исчерпанию файловых дескрипторов и зависанию приложения.

1. Базовые затраты памяти на корутину

Сколько памяти занимает одна корутина?

import sys
import asyncio

async def simple_coro():
    await asyncio.sleep(1)

# Размер объекта корутины
coro = simple_coro()
print(sys.getsizeof(coro))  # ~80-120 байт в зависимости от версии Python

# Но это просто оболочка. Память тратится на:
# 1. Frame объект (стек вызовов)
# 2. Локальные переменные
# 3. Состояние (waiting for, паузы)
# 4. Регистрация в event loop

frame = coro.cr_frame
print(sys.getsizeof(frame))  # ~120-200 байт

На практике:

  • Одна пустая корутина: 100-300 байт
  • Корутина с локальными переменными: 300-1000 байт
  • 1000 корутин: ~1 МБ памяти
  • 100,000 корутин: ~100 МБ памяти

Пример: 100k параллельных соединений

import asyncio
import tracemalloc

async def client_handler(client_id):
    # Локальные переменные
    buffer = bytearray(4096)  # +4 КБ
    cache = {}  # +100+ байт
    state = "connected"
    
    # Ждёт события
    await asyncio.sleep(3600)  # Час

async def simulate_100k_connections():
    tracemalloc.start()
    
    # Создаём 100k задач
    tasks = [client_handler(i) for i in range(100000)]
    await asyncio.gather(*tasks)
    
    current, peak = tracemalloc.get_traced_memory()
    print(f"Current: {current / 1024 / 1024:.1f} МБ")
    print(f"Peak: {peak / 1024 / 1024:.1f} МБ")

# asyncio.run(simulate_100k_connections())
# Результат: ~200-300 МБ для 100k корутин
# В синхронном коде с потоками было бы ~800 МБ - 1 ГБ

2. Event Loop и его структуры данных

Как Event Loop хранит задачи?

import asyncio
import sys

class MockEventLoop:
    def __init__(self):
        self._ready = []      # Очередь готовых к выполнению
        self._scheduled = {}  # Запланированные колбэки
        self._fd_to_handle = {}  # File descriptor -> handle
        self._callbacks = []  # Другие колбэки
        self._current_task = None

# Event loop структуры:
# 1. _ready deque: каждая задача = запись в деке (~50 байт)
# 2. _scheduled heap: запланированные = запись в heap (~200 байт)
# 3. Полисоки (sockets): каждый = файловый дескриптор (~100+ байт)

loop = asyncio.new_event_loop()

async def task():
    await asyncio.sleep(1)

# Создаём 1000 задач
for i in range(1000):
    loop.create_task(task())

# Каждая задача = Task object (~200 байт) + запись в деке
# 1000 * (200 + 50) = ~250 КБ только на структуры event loop

3. Утечки памяти в асинхронности

Утечка 1: Зависшие корутины (Hanging Coroutines)

# ❌ УТЕЧКА: Корутина создана но никогда не awaited
import asyncio
import weakref
import gc

def create_unawaited_coros(count):
    # Создаём корутины но не запускаем
    for i in range(count):
        async def coro():
            await asyncio.sleep(100)
        
        # Корутина создана но забыта
        c = coro()  # ЗАБЫЛИ await!
        # При сборке мусора: RuntimeWarning: never awaited

create_unawaited_coros(100)
gc.collect()

# На production:
# Если часто создаёшь корутины без await —
# они останутся в памяти пока не будут собраны GC
# Множество coroutine objects = утечка

# ✅ ПРАВИЛЬНО
async def main():
    tasks = []
    for i in range(1000):
        # Явно создаём Task
        task = asyncio.create_task(some_coro(i))
        tasks.append(task)
    
    # Явно ждём все
    await asyncio.gather(*tasks)

Утечка 2: Очередь задач переполнена (Queue OOM)

import asyncio

async def producer():
    """Продюсер добавляет данные в очередь"""
    queue = asyncio.Queue()
    
    # ❌ УТЕЧКА: Если консьюмер медленнее продюсера
    while True:
        data = await fetch_data()  # Быстро
        await queue.put(data)  # Быстро добавляет
        # Очередь растёт: 1000 -> 10000 -> 100000 -> OOM

async def consumer():
    """Консьюмер обрабатывает данные медленно"""
    while True:
        data = await queue.get()  # Медленно
        await process_data(data)  # Еще медленнее
        queue.task_done()

# ✅ ПРАВИЛЬНО: Ограничить размер очереди
queue = asyncio.Queue(maxsize=100)  # Max 100 items

# producer будет заблокирован на queue.put() 
# если очередь полна

Утечка 3: Кольцевые ссылки (Circular references)

import asyncio
import gc

class Handler:
    def __init__(self):
        self.data = bytearray(1024 * 1024)  # 1 МБ
        self.coro = self.process()  # Кольцевая ссылка!
    
    async def process(self):
        while True:
            # self references self.coro references self
            # Сборщик мусора может не удалить в срок
            await asyncio.sleep(1)
            # Утечка: Handler и его корутина висят в памяти

# ❌ Утечка
handler = Handler()
handler = None  # Но объект может остаться в памяти
# GC потребуется дополнительный цикл

# ✅ ПРАВИЛЬНО: Явно очищать
handler = Handler()
await handler.coro  # Явно завершить
handler = None

# ✅ ИЛИ: Использовать контекстный менеджер
class Handler:
    async def __aenter__(self):
        self.coro = asyncio.create_task(self.process())
        return self
    
    async def __aexit__(self, *args):
        self.coro.cancel()
        try:
            await self.coro
        except asyncio.CancelledError:
            pass

4. Оптимизация памяти в async

Техника 1: Батчинг вместо одиночных запросов

import asyncio

# ❌ НЕОПТИМАЛЬНО: 10k корутин в памяти одновременно
async def fetch_items_bad(item_ids):
    tasks = [
        fetch_item(item_id)
        for item_id in item_ids
    ]
    # Все 10k задач в памяти
    return await asyncio.gather(*tasks)

# ✅ ОПТИМАЛЬНО: Батчи по N элементов
async def fetch_items_good(item_ids, batch_size=100):
    results = []
    for i in range(0, len(item_ids), batch_size):
        batch = item_ids[i:i + batch_size]
        batch_results = await asyncio.gather(*[
            fetch_item(item_id) for item_id in batch
        ])
        results.extend(batch_results)
        # После каждого батча задачи удаляются из памяти
    return results

# Сравнение памяти:
# fetch_items_bad: 10k * 200 bytes = 2 МБ одновременно
# fetch_items_good: 100 * 200 bytes = 20 КБ одновременно (100x экономия)

Техника 2: Использование семафора (Semaphore)

import asyncio

async def bounded_fetch(item_ids):
    # Максимум 10 одновременных запросов
    semaphore = asyncio.Semaphore(10)
    
    async def fetch_with_limit(item_id):
        async with semaphore:
            return await fetch_item(item_id)
    
    # Создаём все задачи, но максимум 10 выполняются одновременно
    tasks = [
        fetch_with_limit(item_id)
        for item_id in item_ids
    ]
    return await asyncio.gather(*tasks)

# Результат: в памяти максимум 10 + scheduling overhead

Техника 3: Async generator вместо списка

# ❌ НЕОПТИМАЛЬНО: Весь результат в памяти
async def get_all_items():
    items = []
    async for page in paginated_api():
        items.extend(page)  # 10k items в памяти
    return items

# ✅ ОПТИМАЛЬНО: Yield по одному
async def get_all_items_generator():
    async for page in paginated_api():
        for item in page:
            yield item
    # На любой момент в памяти максимум 1 page

# Использование
async for item in get_all_items_generator():
    await process_item(item)
    # item удаляется из памяти сразу после обработки

5. Профилирование памяти в async

import asyncio
import tracemalloc
import linecache
import os

def display_top(snapshot, key_type=lineno, limit=5):
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
    ))
    top_stats = snapshot.statistics(key_type)

    print(f"[ Top {limit} ]")
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        print(f"#{index}: {frame.filename}:{frame.lineno}: {stat.size / 1024:.1f} КБ")
        line = linecache.getline(frame.filename, frame.lineno).strip()
        if line:
            print(f"    {line}")

async def memory_profiled_app():
    tracemalloc.start()
    
    # Твой асинхронный код
    await my_async_app()
    
    snapshot = tracemalloc.take_snapshot()
    display_top(snapshot)

asyncio.run(memory_profiled_app())

6. Best Practices

import asyncio

# 1. Явно отменяй задачи когда они больше не нужны
task = asyncio.create_task(long_running_task())
# ...
task.cancel()
try:
    await task
except asyncio.CancelledError:
    pass

# 2. Используй asyncio.TaskGroup для автоматического управления
async def with_task_group():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task1())
        tg.create_task(task2())
    # Все задачи автоматически очищаются

# 3. Батчируй большие операции
async def batch_operations(items, batch_size=100):
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        await process_batch(batch)
        # Очистка между батчами

# 4. Ограничивай concurrency семафором
semaphore = asyncio.Semaphore(10)
async def limited_task(item):
    async with semaphore:
        return await process(item)

# 5. Используй async generator для больших датасетов
async def stream_data():
    while True:
        data = await get_data()
        yield data

Резюме

Память в асинхронности:

  • На корутину: 100-300 байт + локальные переменные
  • На 100k корутин: ~100-300 МБ (vs 1+ ГБ с потоками)
  • Утечки: Зависшие корутины, переполненные очереди, циклические ссылки
  • Оптимизация: Батчинг, семафоры, async generators
  • Мониторинг: tracemalloc, профилирование

Правило: Асинхронность экономит память, но только если правильно управлять concurrency и не создавать утечек.

Как затрачивается память в асинхронности? | PrepBro