← Назад к вопросам

Объясните разницу между transformation и action в Spark.

2.3 Middle🔥 121 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Transformations и Actions в Apache Spark

Transformations и Actions — это два основных типа операций в Spark RDD и DataFrame API. Различие между ними критично для понимания ленивого вычисления в Spark.

Ленивое вычисление (Lazy Evaluation)

Sparkсочетает план выполнения (DAG — Directed Acyclic Graph) без реального выполнения до тех пор, пока не будет вызвано Action.

Transformations  →  Transformations  →  Transformations  →  Action
   (построение)        (построение)         (построение)      (выполнение)

df = spark.read.parquet(...)  # не читается
df = df.filter(...)            # не выполняется
df = df.select(...)            # не выполняется
df.show()                       # ← ВОТ ЗДЕСЬ выполняется всё сразу!

Transformations (Преобразования)

Определение: Операция, которая возвращает новый RDD/DataFrame, но не вычисляет результат. Transformations ленивые — они не выполняются пока на них не ссылается Action.

Ключевые характеристики:

  • Возвращают новый RDD/DataFrame
  • Не выполняются сразу
  • Создают DAG план
  • Можно цеплять (chaining)

Примеры Transformations:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, sum

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("sales.parquet")

# 1. map - преобразование каждого элемента
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
rdd_mapped = rdd.map(lambda x: x * 2)  # → [2, 4, 6, 8] (не выполнено)

# 2. filter - отбор строк
df_filtered = df.filter(col("amount") > 100)  # не выполнено

# 3. select - выбор колонок
df_selected = df.select("customer_id", "amount")  # не выполнено

# 4. groupBy - группировка
df_grouped = df.groupBy("customer_id")  # не выполнено

# 5. join - объединение таблиц
df_customers = spark.read.parquet("customers.parquet")
df_joined = df.join(df_customers, "customer_id")  # не выполнено

# 6. union - объединение рядов
df_union = df.union(df_filtered)  # не выполнено

# 7. distinct - удаление дубликатов
df_distinct = df.select("customer_id").distinct()  # не выполнено

# 8. sort - сортировка
df_sorted = df.sort(col("amount").desc())  # не выполнено

# 9. withColumn - добавить/изменить колонку
df_with_col = df.withColumn("amount_upper", upper(col("amount")))  # не выполнено

# 10. flatten - развернуть вложенные структуры
from pyspark.sql.functions import explode
df_flattened = df.select("*", explode("items"))  # не выполнено

# ВСЕ эти операции НЕ выполнены, только построен DAG!

Actions (Действия)

Определение: Операция, которая возвращает результат в программу или пишет на диск. Actions немедленно выполняют весь DAG.

Ключевые характеристики:

  • Возвращают значение в программу
  • Выполняют весь DAG
  • Триггируют вычисления
  • Только один раз в коде

Примеры Actions:

# 1. show - вывести первые n строк в консоль
df_filtered.show(10)  # ← ВЫПОЛНЯЕТ весь DAG

# 2. collect - получить все данные в памяти (ОПАСНО!)
result = df_filtered.collect()  # ← ВЫПОЛНЯЕТ
for row in result:
    print(row)

# 3. count - подсчитать количество строк
count = df_filtered.count()  # ← ВЫПОЛНЯЕТ
print(f"Total rows: {count}")

# 4. first - получить первую строку
first_row = df_filtered.first()  # ← ВЫПОЛНЯЕТ

# 5. take - получить первые n строк
top_5 = df_filtered.take(5)  # ← ВЫПОЛНЯЕТ

# 6. write - записать в файл
df_filtered.write.parquet("output.parquet")  # ← ВЫПОЛНЯЕТ
df_filtered.write.mode("overwrite").parquet("output.parquet")

# 7. saveAsTextFile - сохранить как текст (RDD)
rdd.saveAsTextFile("/path/to/output")  # ← ВЫПОЛНЯЕТ

# 8. foreach - применить функцию к каждому элементу (RDD)
rdd.foreach(lambda x: print(x))  # ← ВЫПОЛНЯЕТ

# 9. reduce - агрегировать в одно значение (RDD)
sum_value = rdd.reduce(lambda x, y: x + y)  # ← ВЫПОЛНЯЕТ

# 10. foreachPartition - применить функцию к каждому partition
def process_partition(iterator):
    for row in iterator:
        print(row)

df_filtered.rdd.foreachPartition(process_partition)  # ← ВЫПОЛНЯЕТ

Visual пример

# Шаг 1: Построение DAG (без выполнения)
df = spark.read.parquet("large_file.parquet")  # Transformation
df = df.filter(col("year") == 2024)  # Transformation
df = df.select("customer_id", "amount")  # Transformation
df = df.groupBy("customer_id").sum("amount")  # Transformation

# DAG построен:
# Read → Filter → Select → GroupBy
# НО НИЧЕГО НЕ ВЫПОЛНЕНО!

# Шаг 2: Выполнение DAG
result = df.collect()  # Action

# Spark теперь выполняет весь DAG:
# 1. Прочитать файл
# 2. Отфильтровать по году
# 3. Выбрать две колонки
# 4. Сгруппировать и сумировать
# 5. Собрать результат в памяти

Почему это важно: Оптимизация

Sparkможет оптимизировать весь DAG благодаря ленивому вычислению:

df = spark.read.parquet("100GB_file.parquet")
df = df.filter(col("year") == 2024)  # Catalyst поймёт, что нужно прочитать только 2024
df = df.select("customer_id", "amount")  # Column pruning: читай только 2 колонки
result = df.take(5)  # Take: прочитай только 5 строк

# Spark оптимизирует:
# Читает только 2024 год
# Читает только 2 колонки
# Останавливается после 5 строк

# Результат: вместо 100GB читаем ~1MB

Сравнительная таблица

ХарактеристикаTransformationAction
ВозвращаетНовый RDD/DataFrameЗначение или Side Effect
ВыполнениеЛенивое (откладывается)Немедленное
DAGДобавляет этап в DAGТриггирует выполнение
Примерыmap, filter, select, joincollect, show, write, count
Можно цеплятьДа, очень эффективноОбычно один раз
PerformanceНет (только на диаграмме)Может быть медленным

Практический пример: Анализ логов

from pyspark.sql.functions import col, explode, to_timestamp

# Чтение логов (Transformation)
logs_df = spark.read.json("s3://logs/2024-03-*.json")  # не читается

# Фильтр по уровню (Transformation)
error_logs = logs_df.filter(col("level") == "ERROR")  # не выполняется

# Выбор нужных колонок (Transformation)
error_summary = error_logs.select(
    col("timestamp"),
    col("service"),
    col("message")
)  # не выполняется

# Группировка по сервису (Transformation)
by_service = error_summary.groupBy("service").count()  # не выполняется

# ВСЕ ВЫШЕПЕРЕЧИСЛЕННОЕ НЕ ВЫПОЛНЕНО!

# Первое Action
by_service.show()  # ← ВЫПОЛНЯЕТ весь DAG!

# Spark оптимизирует:
# 1. Читает только файлы 2024-03
# 2. Применяет фильтр на диске (если возможно)
# 3. Выбирает только 3 колонки
# 4. Группирует и считает
# 5. Показывает результат

# Второе Action на ТОМ ЖЕ DataFrame
by_service.write.parquet("output/errors_by_service")  # ← ВЫПОЛНЯЕТ снова!

# Spark перевычисляет! Чтобы избежать:
by_service.cache()  # Кэшируем результат
by_service.show()  # читаем из кэша
by_service.write.parquet("output/")  # всё ещё из кэша

Частые ошибки

Ошибка 1: Забыли Action

# ❌ Плохо: код выполнится, но ничего не произойдёт
df_filtered = spark.read.parquet("data").filter(col("amount") > 100)
# DataFrame построен, но не прочитан и не обработан!

# ✅ Хорошо: добавить Action
df_filtered = spark.read.parquet("data").filter(col("amount") > 100)
df_filtered.show()

Ошибка 2: Несколько раз вычислять одно

# ❌ Плохо: DataFrame вычисляется дважды
df = spark.read.parquet("large_file")
result1 = df.filter(...).collect()  # вычисление 1
result2 = df.filter(...).count()    # вычисление 2

# ✅ Хорошо: кэшируй результат
df = spark.read.parquet("large_file").cache()
result1 = df.filter(...).collect()
result2 = df.filter(...).count()  # из кэша

Ошибка 3: collect() на больших данных

# ❌ ОЧЕНЬ ПЛОХО: загружает всё в памяти одной машины!
df = spark.read.parquet("100GB_file")
all_data = df.collect()  # OutOfMemoryError!

# ✅ Хорошо: использовать take()
top_100 = df.take(100)  # берёт только 100

# ✅ Или write напрямую
df.write.parquet("output")

RDD vs DataFrame Actions

RDD Actions:

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

rdd.collect()  # → [1, 2, 3, 4, 5]
rdd.first()    # → 1
rdd.count()    # → 5
rdd.take(3)    # → [1, 2, 3]
rdd.reduce(lambda x, y: x + y)  # → 15
rdd.foreach(print)  # выполнить для каждого
rdd.saveAsTextFile("/path")  # сохранить

DataFrame Actions:

df = spark.read.parquet("data")

df.show()  # вывести
df.collect()  # получить все
df.first()  # первая строка
df.count()  # количество
df.take(5)  # первые 5
df.write.parquet("output")  # сохранить

Best Practices

  1. Используй Transformations максимально

    • Фильтруй, выбирай, группируй в Transformations
    • Spark оптимизирует весь DAG
  2. Избегай collect() на больших данных

    • Используй take(), write(), foreachPartition()
    • Собирай только то, что нужно
  3. Кэшируй перед несколькими Actions

    df.cache()  # или .persist()
    df.show()
    df.count()
    df.write.parquet("output")  # все из кэша
    
  4. Профилируй DAG

    df.explain()  # смотри физический план
    

Заключение

  • Transformations = ленивые операции (план)
  • Actions = немедленные вычисления (результат)
  • Ленивое вычисление позволяет Spark оптимизировать весь DAG
  • Помни: collect() на больших данных = OutOfMemoryError
  • Кэшируй перед несколькими Actions