← Назад к вопросам
Как затрачивается память в асинхронности?
1.7 Middle🔥 91 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Затраты памяти в асинхронности: от корутин к утечкам
Асинхронность даёт производительность, но требует понимания как и когда память выделяется. Наивное использование async может привести к утечкам памяти, исчерпанию файловых дескрипторов и зависанию приложения.
1. Базовые затраты памяти на корутину
Сколько памяти занимает одна корутина?
import sys
import asyncio
async def simple_coro():
await asyncio.sleep(1)
# Размер объекта корутины
coro = simple_coro()
print(sys.getsizeof(coro)) # ~80-120 байт в зависимости от версии Python
# Но это просто оболочка. Память тратится на:
# 1. Frame объект (стек вызовов)
# 2. Локальные переменные
# 3. Состояние (waiting for, паузы)
# 4. Регистрация в event loop
frame = coro.cr_frame
print(sys.getsizeof(frame)) # ~120-200 байт
На практике:
- Одна пустая корутина: 100-300 байт
- Корутина с локальными переменными: 300-1000 байт
- 1000 корутин: ~1 МБ памяти
- 100,000 корутин: ~100 МБ памяти
Пример: 100k параллельных соединений
import asyncio
import tracemalloc
async def client_handler(client_id):
# Локальные переменные
buffer = bytearray(4096) # +4 КБ
cache = {} # +100+ байт
state = "connected"
# Ждёт события
await asyncio.sleep(3600) # Час
async def simulate_100k_connections():
tracemalloc.start()
# Создаём 100k задач
tasks = [client_handler(i) for i in range(100000)]
await asyncio.gather(*tasks)
current, peak = tracemalloc.get_traced_memory()
print(f"Current: {current / 1024 / 1024:.1f} МБ")
print(f"Peak: {peak / 1024 / 1024:.1f} МБ")
# asyncio.run(simulate_100k_connections())
# Результат: ~200-300 МБ для 100k корутин
# В синхронном коде с потоками было бы ~800 МБ - 1 ГБ
2. Event Loop и его структуры данных
Как Event Loop хранит задачи?
import asyncio
import sys
class MockEventLoop:
def __init__(self):
self._ready = [] # Очередь готовых к выполнению
self._scheduled = {} # Запланированные колбэки
self._fd_to_handle = {} # File descriptor -> handle
self._callbacks = [] # Другие колбэки
self._current_task = None
# Event loop структуры:
# 1. _ready deque: каждая задача = запись в деке (~50 байт)
# 2. _scheduled heap: запланированные = запись в heap (~200 байт)
# 3. Полисоки (sockets): каждый = файловый дескриптор (~100+ байт)
loop = asyncio.new_event_loop()
async def task():
await asyncio.sleep(1)
# Создаём 1000 задач
for i in range(1000):
loop.create_task(task())
# Каждая задача = Task object (~200 байт) + запись в деке
# 1000 * (200 + 50) = ~250 КБ только на структуры event loop
3. Утечки памяти в асинхронности
Утечка 1: Зависшие корутины (Hanging Coroutines)
# ❌ УТЕЧКА: Корутина создана но никогда не awaited
import asyncio
import weakref
import gc
def create_unawaited_coros(count):
# Создаём корутины но не запускаем
for i in range(count):
async def coro():
await asyncio.sleep(100)
# Корутина создана но забыта
c = coro() # ЗАБЫЛИ await!
# При сборке мусора: RuntimeWarning: never awaited
create_unawaited_coros(100)
gc.collect()
# На production:
# Если часто создаёшь корутины без await —
# они останутся в памяти пока не будут собраны GC
# Множество coroutine objects = утечка
# ✅ ПРАВИЛЬНО
async def main():
tasks = []
for i in range(1000):
# Явно создаём Task
task = asyncio.create_task(some_coro(i))
tasks.append(task)
# Явно ждём все
await asyncio.gather(*tasks)
Утечка 2: Очередь задач переполнена (Queue OOM)
import asyncio
async def producer():
"""Продюсер добавляет данные в очередь"""
queue = asyncio.Queue()
# ❌ УТЕЧКА: Если консьюмер медленнее продюсера
while True:
data = await fetch_data() # Быстро
await queue.put(data) # Быстро добавляет
# Очередь растёт: 1000 -> 10000 -> 100000 -> OOM
async def consumer():
"""Консьюмер обрабатывает данные медленно"""
while True:
data = await queue.get() # Медленно
await process_data(data) # Еще медленнее
queue.task_done()
# ✅ ПРАВИЛЬНО: Ограничить размер очереди
queue = asyncio.Queue(maxsize=100) # Max 100 items
# producer будет заблокирован на queue.put()
# если очередь полна
Утечка 3: Кольцевые ссылки (Circular references)
import asyncio
import gc
class Handler:
def __init__(self):
self.data = bytearray(1024 * 1024) # 1 МБ
self.coro = self.process() # Кольцевая ссылка!
async def process(self):
while True:
# self references self.coro references self
# Сборщик мусора может не удалить в срок
await asyncio.sleep(1)
# Утечка: Handler и его корутина висят в памяти
# ❌ Утечка
handler = Handler()
handler = None # Но объект может остаться в памяти
# GC потребуется дополнительный цикл
# ✅ ПРАВИЛЬНО: Явно очищать
handler = Handler()
await handler.coro # Явно завершить
handler = None
# ✅ ИЛИ: Использовать контекстный менеджер
class Handler:
async def __aenter__(self):
self.coro = asyncio.create_task(self.process())
return self
async def __aexit__(self, *args):
self.coro.cancel()
try:
await self.coro
except asyncio.CancelledError:
pass
4. Оптимизация памяти в async
Техника 1: Батчинг вместо одиночных запросов
import asyncio
# ❌ НЕОПТИМАЛЬНО: 10k корутин в памяти одновременно
async def fetch_items_bad(item_ids):
tasks = [
fetch_item(item_id)
for item_id in item_ids
]
# Все 10k задач в памяти
return await asyncio.gather(*tasks)
# ✅ ОПТИМАЛЬНО: Батчи по N элементов
async def fetch_items_good(item_ids, batch_size=100):
results = []
for i in range(0, len(item_ids), batch_size):
batch = item_ids[i:i + batch_size]
batch_results = await asyncio.gather(*[
fetch_item(item_id) for item_id in batch
])
results.extend(batch_results)
# После каждого батча задачи удаляются из памяти
return results
# Сравнение памяти:
# fetch_items_bad: 10k * 200 bytes = 2 МБ одновременно
# fetch_items_good: 100 * 200 bytes = 20 КБ одновременно (100x экономия)
Техника 2: Использование семафора (Semaphore)
import asyncio
async def bounded_fetch(item_ids):
# Максимум 10 одновременных запросов
semaphore = asyncio.Semaphore(10)
async def fetch_with_limit(item_id):
async with semaphore:
return await fetch_item(item_id)
# Создаём все задачи, но максимум 10 выполняются одновременно
tasks = [
fetch_with_limit(item_id)
for item_id in item_ids
]
return await asyncio.gather(*tasks)
# Результат: в памяти максимум 10 + scheduling overhead
Техника 3: Async generator вместо списка
# ❌ НЕОПТИМАЛЬНО: Весь результат в памяти
async def get_all_items():
items = []
async for page in paginated_api():
items.extend(page) # 10k items в памяти
return items
# ✅ ОПТИМАЛЬНО: Yield по одному
async def get_all_items_generator():
async for page in paginated_api():
for item in page:
yield item
# На любой момент в памяти максимум 1 page
# Использование
async for item in get_all_items_generator():
await process_item(item)
# item удаляется из памяти сразу после обработки
5. Профилирование памяти в async
import asyncio
import tracemalloc
import linecache
import os
def display_top(snapshot, key_type=lineno, limit=5):
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
))
top_stats = snapshot.statistics(key_type)
print(f"[ Top {limit} ]")
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
print(f"#{index}: {frame.filename}:{frame.lineno}: {stat.size / 1024:.1f} КБ")
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
print(f" {line}")
async def memory_profiled_app():
tracemalloc.start()
# Твой асинхронный код
await my_async_app()
snapshot = tracemalloc.take_snapshot()
display_top(snapshot)
asyncio.run(memory_profiled_app())
6. Best Practices
import asyncio
# 1. Явно отменяй задачи когда они больше не нужны
task = asyncio.create_task(long_running_task())
# ...
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# 2. Используй asyncio.TaskGroup для автоматического управления
async def with_task_group():
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
# Все задачи автоматически очищаются
# 3. Батчируй большие операции
async def batch_operations(items, batch_size=100):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
await process_batch(batch)
# Очистка между батчами
# 4. Ограничивай concurrency семафором
semaphore = asyncio.Semaphore(10)
async def limited_task(item):
async with semaphore:
return await process(item)
# 5. Используй async generator для больших датасетов
async def stream_data():
while True:
data = await get_data()
yield data
Резюме
Память в асинхронности:
- На корутину: 100-300 байт + локальные переменные
- На 100k корутин: ~100-300 МБ (vs 1+ ГБ с потоками)
- Утечки: Зависшие корутины, переполненные очереди, циклические ссылки
- Оптимизация: Батчинг, семафоры, async generators
- Мониторинг: tracemalloc, профилирование
Правило: Асинхронность экономит память, но только если правильно управлять concurrency и не создавать утечек.