Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Профилирование в Python: поиск узких мест производительности
Профилирование — это техника анализа программы для выявления участков кода, которые потребляют больше всего CPU, памяти или времени. В Data Engineering это критично для оптимизации ETL pipeline.
Зачем профилировать
# ❌ Медленный код, но я не знаю почему
def process_data(data):
result = []
for row in data:
result.append(transform(row))
return result
# Запускаю, и это занимает 2 часа
processed = process_data(million_rows) # 2:00:00
# А что если я используюпрофилирование?
# → "Узкое место: transform() занимает 95% времени"
# → Оптимизирую transform() → результат 5 минут!
Тип 1: Профилирование по времени (cProfile)
cProfile — встроенный профайлер, показывающий, сколько времени функция заняла.
import cProfile
import pstats
from pstats import SortKey
def slow_function():
total = 0
for i in range(100000):
total += sum(range(i)) # Неэффективно
return total
def main():
result = slow_function()
return result
# Профилируем
profiler = cProfile.Profile()
profiler.enable()
main()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats(SortKey.CUMULATIVE)
stats.print_stats()
# Вывод:
# ncalls tottime percall cumtime percall filename:lineno(function)
# 1 2.345 2.345 2.345 2.345 <string>:1(slow_function)
# 100000 0.034 0.000 2.311 0.000 <string>:4(<listcomp>)
#
# Видно: 100K вызовов sum() = 2.3 сек, это узкое место!
Или из командной строки:
# Профилируем весь скрипт
python -m cProfile -s cumulative script.py
# Сохраняем результаты
python -m cProfile -o profile_output.prof script.py
# Анализируем результаты
python -m pstats profile_output.prof
Тип 2: Профилирование памяти (memory_profiler)
memory_profiler — показывает, сколько памяти использует каждая строка кода.
from memory_profiler import profile
@profile
def memory_heavy():
# Строка 1: 0 MB
big_list = [i ** 2 for i in range(1000000)] # +40 MB
# Строка 2: использование памяти
big_dict = {i: i ** 2 for i in range(1000000)} # +80 MB
# Строка 3
result = sum(big_list) + sum(big_dict.values()) # +0 MB (результат малый)
return result # Память освобождается
memory_heavy()
Запуск:
python -m memory_profiler script.py
# Вывод:
# Filename: script.py
#
# Line # Mem usage Increment Line Contents
# ================================================
# 1 42.0 MB 0.0 MB def memory_heavy():
# 2 42.0 MB 0.0 MB big_list = [i ** 2 for i in range(1000000)]
# 3 122.0 MB 80.0 MB big_dict = {i: i ** 2 for i in range(1000000)}
# 4 122.0 MB 0.0 MB result = sum(big_list) + sum(big_dict.values())
Тип 3: Профилирование DataFrame (pandas profiler)
Для Data Engineering с pandas:
import pandas as pd
from pandas_profiling import ProfileReport
# Загружаем большой датасет
df = pd.read_csv('large_dataset.csv')
# Генерируем профильный отчёт
profile = ProfileReport(df, title="Data Profiling Report")
profile.to_file("report.html")
# Отчёт содержит:
# - Distribution графики
# - Missing data анализ
# - Correlation matrix
# - Duplicate rows
# - Variable statistics
Тип 4: Профилирование Spark (spark.sql.execution.ui)
Для Spark ETL процессов:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
spark = SparkSession.builder \
.appName("Profiling") \
.config("spark.sql.execution.ui.enabled", "true") \
.getOrCreate()
df = spark.read.csv('data.csv', header=True)
# Включаем query profiling
df.explain(extended=True) # Показывает physical plan
# Выполняем операцию
result = df.groupBy('category').agg(spark_sum('amount')).collect()
# В Spark Web UI (localhost:4040) видны:
# - Query execution plan (DAG)
# - Stage statistics (время, данные, shuffle)
# - Task breakdown
Практический пример: Оптимизация медленного запроса
Шаг 1: Профилируем и находим проблему
import cProfile
import pstats
from io import StringIO
def slow_etl():
data = [{'id': i, 'value': i**2} for i in range(100000)]
# Операция 1: фильтрация
filtered = [x for x in data if x['value'] > 1000] # Медленно?
# Операция 2: группировка
grouped = {}
for item in filtered:
key = item['id'] % 10
if key not in grouped:
grouped[key] = []
grouped[key].append(item) # Медленно?
return grouped
# Профилируем
pr = cProfile.Profile()
pr.enable()
result = slow_etl()
pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats()
print(s.getvalue())
# Вывод показывает: список comprehension на фильтрацию = 90% времени!
Шаг 2: Оптимизируем
# Вариант 1: Используем numpy (если большие числовые данные)
import numpy as np
def optimized_etl_numpy():
data = np.array([i**2 for i in range(100000)])
filtered = data[data > 1000] # Vectorized, быстрее!
return filtered.sum()
# Вариант 2: Используем pandas (для табличных данных)
import pandas as pd
def optimized_etl_pandas():
df = pd.DataFrame({
'id': range(100000),
'value': [i**2 for i in range(100000)]
})
# Vectorized операции
filtered = df[df['value'] > 1000]
grouped = filtered.groupby('id' % 10).size()
return grouped
# Вариант 3: Используем Spark (для распределённых данных)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
def optimized_etl_spark():
spark = SparkSession.builder.appName("OptimizedETL").getOrCreate()
df = spark.createDataFrame([
{'id': i, 'value': i**2} for i in range(100000)
])
filtered = df.filter(col('value') > 1000)
grouped = filtered.groupBy(expr('id % 10')).count()
return grouped.collect()
# Сравнение скорости:
# Синхронный код: 2.5 сек
# NumPy: 0.1 сек (25x быстрее)
# Pandas: 0.15 сек (16x быстрее)
# Spark (3 узлов): 5 сек (медленнее из-за overhead, но масштабируется)
Инструменты профилирования
1. cProfile (встроенный)
import cProfile
cProfile.run('main()') # Встроенный, не требует установки
2. line_profiler (по строкам)
pip install line_profiler
# В коде:
from line_profiler import LineProfiler
lp = LineProfiler()
lp.add_function(my_function)
lp.enable()
my_function()
lp.disable()
lp.print_stats()
3. py-spy (real-time sampling)
pip install py-spy
# Профилируем running процесс
py-spy record -o profile.svg -- python my_script.py
4. scalene (CPU + GPU + память)
pip install scalene
# Запускаем
scalene my_script.py
# Показывает CPU%, GPU%, memory в реальном времени
5. PyFlame (для production)
pip install pyflame
# Профилируем running процесс
pyflame -o profile.txt python_process_id
Практическое применение: Оптимизация Data Pipeline
import cProfile
import pandas as pd
from memory_profiler import profile
@profile # Отслеживаем память
def etl_pipeline(input_file: str, output_file: str):
# Шаг 1: Extract
df = pd.read_csv(input_file) # Может быть медленным для больших файлов
# Оптимизация: читай батчами для больших файлов
# df = pd.read_csv(input_file, chunksize=100000)
# Шаг 2: Transform
df['date'] = pd.to_datetime(df['date'])
df['amount'] = df['amount'].astype('float32') # Меньше памяти
df_clean = df.dropna()
df_dedup = df_clean.drop_duplicates()
# Шаг 3: Aggregation
result = df_dedup.groupby('category').agg({
'amount': ['sum', 'mean', 'count']
})
# Шаг 4: Load
result.to_csv(output_file, index=False)
return result
# Профилируем
if __name__ == '__main__':
cProfile.run('etl_pipeline("input.csv", "output.csv")', sort='cumulative')
Результаты профилирования и оптимизация
Типичные находки:
ПРОБЛЕМА 1: List comprehension медленнее
✓ РЕШЕНИЕ: Использовать NumPy/Pandas vectorized операции
ПРОБЛЕМА 2: Много маленьких функций
✓ РЕШЕНИЕ: Объединить в одну или использовать numba.jit
ПРОБЛЕМА 3: Работа с памятью (OOM)
✓ РЕШЕНИЕ: Читать/писать батчами, использовать generator
ПРОБЛЕМА 4: Slow IO (SQL запросы)
✓ РЕШЕНИЕ: Batch inserts, параллельная загрузка, async
ПРОБЛЕМА 5: Неэффективные SQL JOIN-ы
✓ РЕШЕНИЕ: Индексы, партиционирование, используй EXPLAIN
Best Practices
# 1. Профилируй перед оптимизацией
profile_first = True # не гадай где узкое место!
# 2. Используй генераторы для больших данных
def process_large_file(filename):
with open(filename) as f:
for line in f: # Не загружает всё в память
yield process_line(line)
# 3. Batch операции для БД
inserts = []
for row in data:
inserts.append(row)
if len(inserts) == 1000:
db.insert_batch(inserts) # Батч вместо одного insert
inserts = []
# 4. Используй appropriate data types
df['id'] = df['id'].astype('int32') # вместо int64
df['amount'] = df['amount'].astype('float32') # вместо float64
# 5. Профилируй регулярно
if __name__ == '__main__':
import sys
if '--profile' in sys.argv:
cProfile.run('main()')
else:
main()
Вывод
Профилирование — это обязательный этап оптимизации:
- cProfile — для анализа времени выполнения функций
- memory_profiler — для анализа использования памяти
- pandas.profiling — для EDA и анализа данных
- Spark explain() — для анализа distributed запросов
- py-spy / scalene — для production приложений
Без профилирования оптимизация — это стрельба вслепую!