Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как управлять утечками памяти в Python
Введение
Утечка памяти — это ситуация, когда объекты остаются в памяти даже после того, как они больше не нужны. Python имеет сборщик мусора (Garbage Collector), но это не гарантирует отсутствие утечек. Data Engineer'ы часто сталкиваются с утечками памяти при обработке больших объёмов данных.
1. Причины утечек памяти в Python
Циклические ссылки
# ПЛОХО: циклические ссылки
class Node:
def __init__(self, value):
self.value = value
self.next = None
self.parent = None
# Создаём циклическую ссылку
node1 = Node(1)
node2 = Node(2)
node1.next = node2
node2.parent = node1 # Циклическая ссылка! Даже если удалим node1 и node2, они не будут удалены
del node1, node2 # Переменные удалены, но объекты остаются в памяти
Почему это проблема: Python GC может справиться с циклическими ссылками, но есть задержка.
Глобальные переменные и кэши
# ПЛОХО: растущий кэш без ограничений
cache = {} # Глобальный кэш
def process_data(key, data):
cache[key] = data # Добавляем в кэш
# ... обработка ...
# Ключи никогда не удаляются!
# После обработки 1М документов кэш использует 100GB RAM
Слушатели событий (Event Listeners)
# ПЛОХО: незарегистрированные слушатели
class DataProcessor:
def __init__(self):
self.results = []
def on_data(self, data):
self.results.append(data) # Растущий список
emitter = DataEmitter()
processor = DataProcessor()
# Регистрируем слушателя
emitter.on('data', processor.on_data)
# Удаляем processor, но слушатель всё ещё активен!
del processor
# processor.results всё ещё в памяти
Циклические импорты
# module_a.py
from module_b import b_function
class A:
pass
# module_b.py
from module_a import A
class B:
pass
# Циклические импорты могут привести к утечкам при перезагрузке модулей
2. Инструменты для отслеживания утечек
memory_profiler
from memory_profiler import profile
@profile
def process_large_file(filename):
data = []
with open(filename) as f:
for line in f:
data.append(line) # Растущий список
return len(data)
# Запуск: python -m memory_profiler script.py
# Вывод:
# Filename: script.py
# Line # Mem usage Increment Occurrences Line Contents
# 1 38.3 MiB process_large_file
# 2 38.4 MiB 0.1 MiB data = []
# 3 38.4 MiB 0.0 MiB with open(filename) as f:
# 4 500.2 MiB 461.8 MiB for line in f: <- Много памяти!
# 5 500.2 MiB 0.0 MiB data.append(line)
tracemalloc
import tracemalloc
tracemalloc.start()
# Ваш код
data = [[i for i in range(1000)] for _ in range(10000)]
# Снимок памяти
current, peak = tracemalloc.get_traced_memory()
print(f"Текущая: {current / 1024 / 1024:.1f} MB; Пик: {peak / 1024 / 1024:.1f} MB")
# Топ потребителей памяти
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 ]")
for stat in top_stats[:10]:
print(stat)
objgraph
import objgraph
# Показать объекты, которые растут
objgraph.show_growth()
# Ваш код
for _ in range(1000):
x = [1, 2, 3, 4, 5]
objgraph.show_growth() # Показывает новые объекты
# Показать рост определённого типа
objgraph.show_most_common_types(limit=10)
3. Решения и best practices
Использовать слабые ссылки (Weak References)
import weakref
class Node:
def __init__(self, value):
self.value = value
self.next = None
self.parent_ref = None # Слабая ссылка
def set_parent(self, parent):
# Используем weakref, чтобы избежать циклических ссылок
self.parent_ref = weakref.ref(parent)
def get_parent(self):
if self.parent_ref is None:
return None
return self.parent_ref() # () вызывает слабую ссылку
# Теперь циклических ссылок нет
node1 = Node(1)
node2 = Node(2)
node1.next = node2
node2.set_parent(node1)
del node1, node2 # Объекты нормально удаляются
Ограничить размер кэша (LRU Cache)
from functools import lru_cache
from collections import OrderedDict
# ХОРОШО: встроенный LRU кэш
@lru_cache(maxsize=1000) # Максимум 1000 элементов
def expensive_function(x):
return x ** 2
# Или собственный LRU кэш
class LRUCache:
def __init__(self, max_size):
self.cache = OrderedDict()
self.max_size = max_size
def get(self, key):
if key in self.cache:
self.cache.move_to_end(key) # Переместить в конец (самый свежий)
return self.cache[key]
return None
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
# Удалить самый старый элемент, если превышен лимит
if len(self.cache) > self.max_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
# Использование
cache = LRUCache(max_size=1000)
cache.put('key1', 'value1')
print(cache.get('key1')) # 'value1'
Явно удалять большие объекты
# ПЛОХО: большие объекты остаются в памяти
def process_data():
large_array = [0] * 1000000 # 1M элементов
result = sum(large_array)
return result # large_array всё ещё в памяти
# ХОРОШО: явно удалить
def process_data():
large_array = [0] * 1000000
result = sum(large_array)
del large_array # Явно удаляем
return result
# ХОРОШО: использовать контекстный менеджер
class DataProcessor:
def __init__(self):
self.data = None
def __enter__(self):
self.data = [0] * 1000000
return self
def __exit__(self, *args):
self.data = None # Удаляем при выходе
def process(self):
return sum(self.data)
with DataProcessor() as processor:
result = processor.process()
# processor.data автоматически удалена
Генераторы вместо списков
# ПЛОХО: все данные в памяти
def read_large_file(filename):
lines = [] # Большой список
with open(filename) as f:
for line in f:
lines.append(line) # Вся файл в памяти
return lines
# ХОРОШО: генератор (ленивые вычисления)
def read_large_file(filename):
with open(filename) as f:
for line in f:
yield line.strip() # Одна строка в памяти за раз
# Использование
for line in read_large_file('huge.txt'):
process_line(line) # Обрабатываем построчно
Батчевая обработка вместо загрузки всего сразу
# ПЛОХО: загружаем всё в DataFrame
import pandas as pd
df = pd.read_csv('billion_rows.csv') # Может потребовать 100GB RAM
result = df.groupby('category')['value'].sum()
# ХОРОШО: обрабатываем батчами
def process_csv_in_batches(filename, batch_size=10000):
result = {}
for chunk in pd.read_csv(filename, chunksize=batch_size):
# Обрабатываем батч
batch_result = chunk.groupby('category')['value'].sum()
# Накапливаем результаты
for category, value in batch_result.items():
result[category] = result.get(category, 0) + value
return result
result = process_csv_in_batches('billion_rows.csv')
4. Профилирование памяти в production
Мониторинг процесса
import psutil
import os
def get_memory_usage():
"""Получить использование памяти текущим процессом"""
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return {
'rss': mem_info.rss / 1024 / 1024, # MB (всё, что выделено)
'vms': mem_info.vms / 1024 / 1024, # MB (виртуальная память)
'percent': process.memory_percent() # % от всей системной памяти
}
# Мониторинг
import time
for i in range(10):
mem = get_memory_usage()
print(f"Итерация {i}: RSS={mem['rss']:.1f}MB, {mem['percent']:.1f}%")
time.sleep(1)
Логирование утечек
import logging
import tracemalloc
logger = logging.getLogger(__name__)
class MemoryTracker:
def __init__(self):
tracemalloc.start()
self.last_snapshot = tracemalloc.take_snapshot()
def check_growth(self, threshold_mb=10):
"""Проверить прирост памяти"""
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.compare_to(self.last_snapshot, 'lineno')
total_diff = sum(stat.size_diff for stat in top_stats) / 1024 / 1024
if total_diff > threshold_mb:
logger.warning(f"Прирост памяти: {total_diff:.1f}MB")
for stat in top_stats[:5]:
logger.warning(f" {stat}")
self.last_snapshot = snapshot
tracker = MemoryTracker()
# В цикле обработки
for batch in data_generator:
process_batch(batch)
tracker.check_growth(threshold_mb=50) # Предупреждение если выросло на 50MB
5. Конкретные примеры утечек в Data Engineering
Утечка при работе с Spark
# ПЛОХО: собирать весь DataFrame на драйвер
df = spark.read.csv('large_file.csv')
data = df.collect() # Весь датасет в памяти драйвера!
# ХОРОШО: обрабатывать распределённо
result = df.groupBy('category').count().collect() # Только результаты
Утечка при работе с pandas
# ПЛОХО: дублирование при конкатенации
import pandas as pd
df_list = []
for file in files:
df_list.append(pd.read_csv(file))
result = pd.concat(df_list) # O(n) копирований!
# ХОРОШО: использовать список файлов
result = pd.concat([pd.read_csv(f) for f in files])
# Или ещё лучше:
result = pd.read_csv(files[0])
for file in files[1:]:
result = pd.concat([result, pd.read_csv(file)], ignore_index=True)
Утечка в boto3 (AWS SDK)
# ПЛОХО: переиспользование клиента без очистки
import boto3
def process_s3_files():
s3 = boto3.client('s3')
for key in keys:
obj = s3.get_object(Bucket='bucket', Key=key)
data = obj['Body'].read() # Данные накапливаются
# Не очищаем!
# ХОРОШО: явно очищать ресурсы
def process_s3_files():
with boto3.client('s3') as s3:
for key in keys:
obj = s3.get_object(Bucket='bucket', Key=key)
with obj['Body'] as stream:
data = stream.read()
# stream закрывается автоматически
del data # Явно удаляем
Чеклист для профилирования памяти
- ✅ Запустить
memory_profilerна критичных функциях - ✅ Использовать
tracemallocдля поиска горячих мест - ✅ Проверить циклические ссылки (
objgraph) - ✅ Ограничить размеры кэшей (
lru_cache,maxsize) - ✅ Использовать генераторы вместо списков
- ✅ Обрабатывать данные батчами, не целиком
- ✅ Явно удалять большие объекты
- ✅ Мониторить память в production
Вывод
Утечки памяти в Python — это не какой-то вол. Они появляются из-за:
- Архитектурных ошибок (циклические ссылки, глобальные кэши)
- Алгоритмических ошибок (загрузка всего вместо потокового обхода)
- Неправильного использования библиотек (забыли закрыть файл, соединение)
Использование правильных инструментов и паттернов может снизить потребление памяти в 10-100 раз для типичных Data Engineering задач.