← Назад к вопросам

Что знаешь про профилировнние?

1.7 Middle🔥 131 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Профилирование в Python: поиск узких мест производительности

Профилирование — это техника анализа программы для выявления участков кода, которые потребляют больше всего CPU, памяти или времени. В Data Engineering это критично для оптимизации ETL pipeline.

Зачем профилировать

# ❌ Медленный код, но я не знаю почему
def process_data(data):
    result = []
    for row in data:
        result.append(transform(row))
    return result

# Запускаю, и это занимает 2 часа
processed = process_data(million_rows)  # 2:00:00

# А что если я используюпрофилирование?
# → "Узкое место: transform() занимает 95% времени"
# → Оптимизирую transform() → результат 5 минут!

Тип 1: Профилирование по времени (cProfile)

cProfile — встроенный профайлер, показывающий, сколько времени функция заняла.

import cProfile
import pstats
from pstats import SortKey

def slow_function():
    total = 0
    for i in range(100000):
        total += sum(range(i))  # Неэффективно
    return total

def main():
    result = slow_function()
    return result

# Профилируем
profiler = cProfile.Profile()
profiler.enable()

main()

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats(SortKey.CUMULATIVE)
stats.print_stats()

# Вывод:
#      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
#           1    2.345    2.345    2.345    2.345 <string>:1(slow_function)
#      100000    0.034    0.000    2.311    0.000 <string>:4(<listcomp>)
#
# Видно: 100K вызовов sum() = 2.3 сек, это узкое место!

Или из командной строки:

# Профилируем весь скрипт
python -m cProfile -s cumulative script.py

# Сохраняем результаты
python -m cProfile -o profile_output.prof script.py

# Анализируем результаты
python -m pstats profile_output.prof

Тип 2: Профилирование памяти (memory_profiler)

memory_profiler — показывает, сколько памяти использует каждая строка кода.

from memory_profiler import profile

@profile
def memory_heavy():
    # Строка 1: 0 MB
    big_list = [i ** 2 for i in range(1000000)]  # +40 MB
    
    # Строка 2: использование памяти
    big_dict = {i: i ** 2 for i in range(1000000)}  # +80 MB
    
    # Строка 3
    result = sum(big_list) + sum(big_dict.values())  # +0 MB (результат малый)
    
    return result  # Память освобождается

memory_heavy()

Запуск:

python -m memory_profiler script.py

# Вывод:
# Filename: script.py
#
# Line #    Mem usage    Increment   Line Contents
# ================================================
#      1     42.0 MB      0.0 MB   def memory_heavy():
#      2     42.0 MB      0.0 MB       big_list = [i ** 2 for i in range(1000000)]
#      3    122.0 MB     80.0 MB       big_dict = {i: i ** 2 for i in range(1000000)}
#      4    122.0 MB      0.0 MB       result = sum(big_list) + sum(big_dict.values())

Тип 3: Профилирование DataFrame (pandas profiler)

Для Data Engineering с pandas:

import pandas as pd
from pandas_profiling import ProfileReport

# Загружаем большой датасет
df = pd.read_csv('large_dataset.csv')

# Генерируем профильный отчёт
profile = ProfileReport(df, title="Data Profiling Report")
profile.to_file("report.html")

# Отчёт содержит:
# - Distribution графики
# - Missing data анализ
# - Correlation matrix
# - Duplicate rows
# - Variable statistics

Тип 4: Профилирование Spark (spark.sql.execution.ui)

Для Spark ETL процессов:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum

spark = SparkSession.builder \
    .appName("Profiling") \
    .config("spark.sql.execution.ui.enabled", "true") \
    .getOrCreate()

df = spark.read.csv('data.csv', header=True)

# Включаем query profiling
df.explain(extended=True)  # Показывает physical plan

# Выполняем операцию
result = df.groupBy('category').agg(spark_sum('amount')).collect()

# В Spark Web UI (localhost:4040) видны:
# - Query execution plan (DAG)
# - Stage statistics (время, данные, shuffle)
# - Task breakdown

Практический пример: Оптимизация медленного запроса

Шаг 1: Профилируем и находим проблему

import cProfile
import pstats
from io import StringIO

def slow_etl():
    data = [{'id': i, 'value': i**2} for i in range(100000)]
    
    # Операция 1: фильтрация
    filtered = [x for x in data if x['value'] > 1000]  # Медленно?
    
    # Операция 2: группировка
    grouped = {}
    for item in filtered:
        key = item['id'] % 10
        if key not in grouped:
            grouped[key] = []
        grouped[key].append(item)  # Медленно?
    
    return grouped

# Профилируем
pr = cProfile.Profile()
pr.enable()

result = slow_etl()

pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats()
print(s.getvalue())

# Вывод показывает: список comprehension на фильтрацию = 90% времени!

Шаг 2: Оптимизируем

# Вариант 1: Используем numpy (если большие числовые данные)
import numpy as np

def optimized_etl_numpy():
    data = np.array([i**2 for i in range(100000)])
    filtered = data[data > 1000]  # Vectorized, быстрее!
    return filtered.sum()

# Вариант 2: Используем pandas (для табличных данных)
import pandas as pd

def optimized_etl_pandas():
    df = pd.DataFrame({
        'id': range(100000),
        'value': [i**2 for i in range(100000)]
    })
    # Vectorized операции
    filtered = df[df['value'] > 1000]
    grouped = filtered.groupby('id' % 10).size()
    return grouped

# Вариант 3: Используем Spark (для распределённых данных)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

def optimized_etl_spark():
    spark = SparkSession.builder.appName("OptimizedETL").getOrCreate()
    df = spark.createDataFrame([
        {'id': i, 'value': i**2} for i in range(100000)
    ])
    
    filtered = df.filter(col('value') > 1000)
    grouped = filtered.groupBy(expr('id % 10')).count()
    return grouped.collect()

# Сравнение скорости:
# Синхронный код: 2.5 сек
# NumPy: 0.1 сек (25x быстрее)
# Pandas: 0.15 сек (16x быстрее)
# Spark (3 узлов): 5 сек (медленнее из-за overhead, но масштабируется)

Инструменты профилирования

1. cProfile (встроенный)

import cProfile
cProfile.run('main()')  # Встроенный, не требует установки

2. line_profiler (по строкам)

pip install line_profiler

# В коде:
from line_profiler import LineProfiler

lp = LineProfiler()
lp.add_function(my_function)
lp.enable()
my_function()
lp.disable()
lp.print_stats()

3. py-spy (real-time sampling)

pip install py-spy

# Профилируем running процесс
py-spy record -o profile.svg -- python my_script.py

4. scalene (CPU + GPU + память)

pip install scalene

# Запускаем
scalene my_script.py
# Показывает CPU%, GPU%, memory в реальном времени

5. PyFlame (для production)

pip install pyflame

# Профилируем running процесс
pyflame -o profile.txt python_process_id

Практическое применение: Оптимизация Data Pipeline

import cProfile
import pandas as pd
from memory_profiler import profile

@profile  # Отслеживаем память
def etl_pipeline(input_file: str, output_file: str):
    # Шаг 1: Extract
    df = pd.read_csv(input_file)  # Может быть медленным для больших файлов
    
    # Оптимизация: читай батчами для больших файлов
    # df = pd.read_csv(input_file, chunksize=100000)
    
    # Шаг 2: Transform
    df['date'] = pd.to_datetime(df['date'])
    df['amount'] = df['amount'].astype('float32')  # Меньше памяти
    df_clean = df.dropna()
    df_dedup = df_clean.drop_duplicates()
    
    # Шаг 3: Aggregation
    result = df_dedup.groupby('category').agg({
        'amount': ['sum', 'mean', 'count']
    })
    
    # Шаг 4: Load
    result.to_csv(output_file, index=False)
    
    return result

# Профилируем
if __name__ == '__main__':
    cProfile.run('etl_pipeline("input.csv", "output.csv")', sort='cumulative')

Результаты профилирования и оптимизация

Типичные находки:

ПРОБЛЕМА 1: List comprehension медленнее
✓ РЕШЕНИЕ: Использовать NumPy/Pandas vectorized операции

ПРОБЛЕМА 2: Много маленьких функций
✓ РЕШЕНИЕ: Объединить в одну или использовать numba.jit

ПРОБЛЕМА 3: Работа с памятью (OOM)
✓ РЕШЕНИЕ: Читать/писать батчами, использовать generator

ПРОБЛЕМА 4: Slow IO (SQL запросы)
✓ РЕШЕНИЕ: Batch inserts, параллельная загрузка, async

ПРОБЛЕМА 5: Неэффективные SQL JOIN-ы
✓ РЕШЕНИЕ: Индексы, партиционирование, используй EXPLAIN

Best Practices

# 1. Профилируй перед оптимизацией
profile_first = True  # не гадай где узкое место!

# 2. Используй генераторы для больших данных
def process_large_file(filename):
    with open(filename) as f:
        for line in f:  # Не загружает всё в память
            yield process_line(line)

# 3. Batch операции для БД
inserts = []
for row in data:
    inserts.append(row)
    if len(inserts) == 1000:
        db.insert_batch(inserts)  # Батч вместо одного insert
        inserts = []

# 4. Используй appropriate data types
df['id'] = df['id'].astype('int32')     # вместо int64
df['amount'] = df['amount'].astype('float32')  # вместо float64

# 5. Профилируй регулярно
if __name__ == '__main__':
    import sys
    if '--profile' in sys.argv:
        cProfile.run('main()')
    else:
        main()

Вывод

Профилирование — это обязательный этап оптимизации:

  1. cProfile — для анализа времени выполнения функций
  2. memory_profiler — для анализа использования памяти
  3. pandas.profiling — для EDA и анализа данных
  4. Spark explain() — для анализа distributed запросов
  5. py-spy / scalene — для production приложений

Без профилирования оптимизация — это стрельба вслепую!