Объясните разницу между transformation и action в Spark.
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Transformations и Actions в Apache Spark
Transformations и Actions — это два основных типа операций в Spark RDD и DataFrame API. Различие между ними критично для понимания ленивого вычисления в Spark.
Ленивое вычисление (Lazy Evaluation)
Sparkсочетает план выполнения (DAG — Directed Acyclic Graph) без реального выполнения до тех пор, пока не будет вызвано Action.
Transformations → Transformations → Transformations → Action
(построение) (построение) (построение) (выполнение)
df = spark.read.parquet(...) # не читается
df = df.filter(...) # не выполняется
df = df.select(...) # не выполняется
df.show() # ← ВОТ ЗДЕСЬ выполняется всё сразу!
Transformations (Преобразования)
Определение: Операция, которая возвращает новый RDD/DataFrame, но не вычисляет результат. Transformations ленивые — они не выполняются пока на них не ссылается Action.
Ключевые характеристики:
- Возвращают новый RDD/DataFrame
- Не выполняются сразу
- Создают DAG план
- Можно цеплять (chaining)
Примеры Transformations:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, sum
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("sales.parquet")
# 1. map - преобразование каждого элемента
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
rdd_mapped = rdd.map(lambda x: x * 2) # → [2, 4, 6, 8] (не выполнено)
# 2. filter - отбор строк
df_filtered = df.filter(col("amount") > 100) # не выполнено
# 3. select - выбор колонок
df_selected = df.select("customer_id", "amount") # не выполнено
# 4. groupBy - группировка
df_grouped = df.groupBy("customer_id") # не выполнено
# 5. join - объединение таблиц
df_customers = spark.read.parquet("customers.parquet")
df_joined = df.join(df_customers, "customer_id") # не выполнено
# 6. union - объединение рядов
df_union = df.union(df_filtered) # не выполнено
# 7. distinct - удаление дубликатов
df_distinct = df.select("customer_id").distinct() # не выполнено
# 8. sort - сортировка
df_sorted = df.sort(col("amount").desc()) # не выполнено
# 9. withColumn - добавить/изменить колонку
df_with_col = df.withColumn("amount_upper", upper(col("amount"))) # не выполнено
# 10. flatten - развернуть вложенные структуры
from pyspark.sql.functions import explode
df_flattened = df.select("*", explode("items")) # не выполнено
# ВСЕ эти операции НЕ выполнены, только построен DAG!
Actions (Действия)
Определение: Операция, которая возвращает результат в программу или пишет на диск. Actions немедленно выполняют весь DAG.
Ключевые характеристики:
- Возвращают значение в программу
- Выполняют весь DAG
- Триггируют вычисления
- Только один раз в коде
Примеры Actions:
# 1. show - вывести первые n строк в консоль
df_filtered.show(10) # ← ВЫПОЛНЯЕТ весь DAG
# 2. collect - получить все данные в памяти (ОПАСНО!)
result = df_filtered.collect() # ← ВЫПОЛНЯЕТ
for row in result:
print(row)
# 3. count - подсчитать количество строк
count = df_filtered.count() # ← ВЫПОЛНЯЕТ
print(f"Total rows: {count}")
# 4. first - получить первую строку
first_row = df_filtered.first() # ← ВЫПОЛНЯЕТ
# 5. take - получить первые n строк
top_5 = df_filtered.take(5) # ← ВЫПОЛНЯЕТ
# 6. write - записать в файл
df_filtered.write.parquet("output.parquet") # ← ВЫПОЛНЯЕТ
df_filtered.write.mode("overwrite").parquet("output.parquet")
# 7. saveAsTextFile - сохранить как текст (RDD)
rdd.saveAsTextFile("/path/to/output") # ← ВЫПОЛНЯЕТ
# 8. foreach - применить функцию к каждому элементу (RDD)
rdd.foreach(lambda x: print(x)) # ← ВЫПОЛНЯЕТ
# 9. reduce - агрегировать в одно значение (RDD)
sum_value = rdd.reduce(lambda x, y: x + y) # ← ВЫПОЛНЯЕТ
# 10. foreachPartition - применить функцию к каждому partition
def process_partition(iterator):
for row in iterator:
print(row)
df_filtered.rdd.foreachPartition(process_partition) # ← ВЫПОЛНЯЕТ
Visual пример
# Шаг 1: Построение DAG (без выполнения)
df = spark.read.parquet("large_file.parquet") # Transformation
df = df.filter(col("year") == 2024) # Transformation
df = df.select("customer_id", "amount") # Transformation
df = df.groupBy("customer_id").sum("amount") # Transformation
# DAG построен:
# Read → Filter → Select → GroupBy
# НО НИЧЕГО НЕ ВЫПОЛНЕНО!
# Шаг 2: Выполнение DAG
result = df.collect() # Action
# Spark теперь выполняет весь DAG:
# 1. Прочитать файл
# 2. Отфильтровать по году
# 3. Выбрать две колонки
# 4. Сгруппировать и сумировать
# 5. Собрать результат в памяти
Почему это важно: Оптимизация
Sparkможет оптимизировать весь DAG благодаря ленивому вычислению:
df = spark.read.parquet("100GB_file.parquet")
df = df.filter(col("year") == 2024) # Catalyst поймёт, что нужно прочитать только 2024
df = df.select("customer_id", "amount") # Column pruning: читай только 2 колонки
result = df.take(5) # Take: прочитай только 5 строк
# Spark оптимизирует:
# Читает только 2024 год
# Читает только 2 колонки
# Останавливается после 5 строк
# Результат: вместо 100GB читаем ~1MB
Сравнительная таблица
| Характеристика | Transformation | Action |
|---|---|---|
| Возвращает | Новый RDD/DataFrame | Значение или Side Effect |
| Выполнение | Ленивое (откладывается) | Немедленное |
| DAG | Добавляет этап в DAG | Триггирует выполнение |
| Примеры | map, filter, select, join | collect, show, write, count |
| Можно цеплять | Да, очень эффективно | Обычно один раз |
| Performance | Нет (только на диаграмме) | Может быть медленным |
Практический пример: Анализ логов
from pyspark.sql.functions import col, explode, to_timestamp
# Чтение логов (Transformation)
logs_df = spark.read.json("s3://logs/2024-03-*.json") # не читается
# Фильтр по уровню (Transformation)
error_logs = logs_df.filter(col("level") == "ERROR") # не выполняется
# Выбор нужных колонок (Transformation)
error_summary = error_logs.select(
col("timestamp"),
col("service"),
col("message")
) # не выполняется
# Группировка по сервису (Transformation)
by_service = error_summary.groupBy("service").count() # не выполняется
# ВСЕ ВЫШЕПЕРЕЧИСЛЕННОЕ НЕ ВЫПОЛНЕНО!
# Первое Action
by_service.show() # ← ВЫПОЛНЯЕТ весь DAG!
# Spark оптимизирует:
# 1. Читает только файлы 2024-03
# 2. Применяет фильтр на диске (если возможно)
# 3. Выбирает только 3 колонки
# 4. Группирует и считает
# 5. Показывает результат
# Второе Action на ТОМ ЖЕ DataFrame
by_service.write.parquet("output/errors_by_service") # ← ВЫПОЛНЯЕТ снова!
# Spark перевычисляет! Чтобы избежать:
by_service.cache() # Кэшируем результат
by_service.show() # читаем из кэша
by_service.write.parquet("output/") # всё ещё из кэша
Частые ошибки
Ошибка 1: Забыли Action
# ❌ Плохо: код выполнится, но ничего не произойдёт
df_filtered = spark.read.parquet("data").filter(col("amount") > 100)
# DataFrame построен, но не прочитан и не обработан!
# ✅ Хорошо: добавить Action
df_filtered = spark.read.parquet("data").filter(col("amount") > 100)
df_filtered.show()
Ошибка 2: Несколько раз вычислять одно
# ❌ Плохо: DataFrame вычисляется дважды
df = spark.read.parquet("large_file")
result1 = df.filter(...).collect() # вычисление 1
result2 = df.filter(...).count() # вычисление 2
# ✅ Хорошо: кэшируй результат
df = spark.read.parquet("large_file").cache()
result1 = df.filter(...).collect()
result2 = df.filter(...).count() # из кэша
Ошибка 3: collect() на больших данных
# ❌ ОЧЕНЬ ПЛОХО: загружает всё в памяти одной машины!
df = spark.read.parquet("100GB_file")
all_data = df.collect() # OutOfMemoryError!
# ✅ Хорошо: использовать take()
top_100 = df.take(100) # берёт только 100
# ✅ Или write напрямую
df.write.parquet("output")
RDD vs DataFrame Actions
RDD Actions:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd.collect() # → [1, 2, 3, 4, 5]
rdd.first() # → 1
rdd.count() # → 5
rdd.take(3) # → [1, 2, 3]
rdd.reduce(lambda x, y: x + y) # → 15
rdd.foreach(print) # выполнить для каждого
rdd.saveAsTextFile("/path") # сохранить
DataFrame Actions:
df = spark.read.parquet("data")
df.show() # вывести
df.collect() # получить все
df.first() # первая строка
df.count() # количество
df.take(5) # первые 5
df.write.parquet("output") # сохранить
Best Practices
-
Используй Transformations максимально
- Фильтруй, выбирай, группируй в Transformations
- Spark оптимизирует весь DAG
-
Избегай collect() на больших данных
- Используй take(), write(), foreachPartition()
- Собирай только то, что нужно
-
Кэшируй перед несколькими Actions
df.cache() # или .persist() df.show() df.count() df.write.parquet("output") # все из кэша -
Профилируй DAG
df.explain() # смотри физический план
Заключение
- Transformations = ленивые операции (план)
- Actions = немедленные вычисления (результат)
- Ленивое вычисление позволяет Spark оптимизировать весь DAG
- Помни: collect() на больших данных = OutOfMemoryError
- Кэшируй перед несколькими Actions