Что такое Spark RDD, DataFrames и Datasets? В чём различия и когда использовать каждый?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Spark RDD, DataFrames и Datasets
RDD, DataFrames и Datasets — это три основных API для работы с данными в Apache Spark, каждый с разным уровнем абстракции и оптимизации.
1. RDD (Resilient Distributed Dataset)
RDD — базовая абстракция Spark, представляет неизменяемое распределённое собрание объектов, которое можно обрабатывать параллельно.
from pyspark import SparkContext
sc = SparkContext("local", "RDD Example")
# Создание RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Трансформации (ленивые)
rdd_doubled = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
rdd_filtered = rdd_doubled.filter(lambda x: x > 5) # [6, 8, 10]
# Действия (жадные)
result = rdd_filtered.collect() # [6, 8, 10]
# Работа с ключ-значение (key-value RDD)
pairs = rdd.map(lambda x: (x % 2, x)) # [(1,1), (0,2), (1,3), (0,4), (1,5)]
by_key = pairs.reduceByKey(lambda x, y: x + y) # [(1, 9), (0, 6)]
Характеристики RDD:
- Низкоуровневый API — полный контроль
- Неструктурированные данные — обычный Python/Java/Scala объекты
- Медленнее — нет автоматической оптимизации
- Гибкость — сложная логика
Когда использовать RDD:
- Неструктурированные данные (текст, бинарные)
- Сложные функции, не поддерживающиеся DataFrame
- Legacy код
2. DataFrames
DataFrame — структурированная таблица с именованными колонками и типами. Оптимизирован Catalyst optimizer.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
# Создание из Python list
data = [("Alice", 25, "HR"), ("Bob", 30, "IT"), ("Charlie", 35, "IT")]
df = spark.createDataFrame(data, ["name", "age", "department"])
# Из файла
df_csv = spark.read.csv("employees.csv", header=True)
df_parquet = spark.read.parquet("data.parquet")
# Операции (оптимизированные)
df_filtered = df.filter(df.age > 25) # Catalyst оптимизирует
df_grouped = df.groupBy("department").agg({"age": "avg"})
df_joined = df.join(other_df, "department")
# SQL запрос
df.createOrReplaceTempView("employees")
result = spark.sql("SELECT name, age FROM employees WHERE age > 25")
# Вывод
result.show()
result.explain() # План выполнения
Характеристики DataFrame:
- Структурированные данные — таблицы, SQL
- Оптимизирован — Catalyst optimizer, Tungsten
- Интуитивно — похож на Pandas
- Производительный — обычно лучше RDD
Когда использовать DataFrame:
- Структурированные данные (таблицы, CSV, Parquet)
- Стандартные операции (filter, join, aggregate)
- Когда важна производительность
3. Datasets
Dataset — типизированный DataFrame. Сочетает типобезопасность RDD с оптимизацией DataFrame. Доступны в Scala/Java.
// Scala Dataset (Python не поддерживает полностью)
case class Employee(name: String, age: Int, department: String)
val ds = Seq(
Employee("Alice", 25, "HR"),
Employee("Bob", 30, "IT")
).toDS()
// Типизированные трансформации
ds.filter(_.age > 25)
.map(e => (e.name, e.age))
.show()
// Комбинация DataFrame и Dataset
val df = ds.toDF() // Dataset -> DataFrame
val ds2 = df.as[Employee] // DataFrame -> Dataset
# Python Dataset эмуляция через DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("department", StringType(), True)
])
df_typed = spark.createDataFrame(data, schema=schema)
Характеристики Dataset:
- Типизирован — compile-time type safety
- Оптимизирован — как DataFrame
- Scala/Java — Python не поддерживает полностью
- Гибкий — объектно-ориентированный API
Сравнительная таблица
| Характеристика | RDD | DataFrame | Dataset |
|---|---|---|---|
| Абстракция | Low-level | Structured | Type-safe |
| Оптимизация | Нет | Catalyst | Catalyst + Type |
| Скорость | Медленная | Быстрая | Быстрая |
| Типизация | Нет | Рантайм | Compile-time |
| SQL | Нет | Да | Да |
| Сложные функции | Да | Средне | Средне |
| Типичный случай | Текст | Таблицы | Scala/Java |
Производительность
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
spark = SparkSession.builder.getOrCreate()
# Создаем большой набор данных
data = [(i, i % 10, i * 2) for i in range(1_000_000)]
# 1. RDD подход (медленный)
start = time.time()
rdd = sc.parallelize(data)
result_rdd = rdd.map(lambda x: (x[1], x[2])) \
.reduceByKey(lambda x, y: x + y) \
.collect()
rdd_time = time.time() - start
print(f"RDD: {rdd_time:.2f}s")
# 2. DataFrame подход (быстрый)
start = time.time()
df = spark.createDataFrame(data, ["id", "key", "value"])
result_df = df.groupBy("key").agg(sum("value")).collect()
df_time = time.time() - start
print(f"DataFrame: {df_time:.2f}s")
# Результат: DataFrame ~5x быстрее
Конвертации
# RDD -> DataFrame
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, ["id", "name"])
# DataFrame -> RDD
rdd_back = df.rdd # RDD[Row]
# Извлечение значений
values = rdd_back.map(lambda row: (row.id, row.name)).collect()
Практический пример
from pyspark.sql import SparkSession
from pyspark.sql.functions import *, when
spark = SparkSession.builder.appName("ETL").getOrCreate()
# Загрузка (DataFrame — лучший выбор)
orders = spark.read.parquet("orders.parquet")
customers = spark.read.parquet("customers.parquet")
# Трансформации (DataFrame API оптимизирован)
orders_filtered = orders.filter(orders.status == "completed")
result = orders_filtered \
.join(customers, "customer_id") \
.groupBy("customer_id", "name") \
.agg(
count("order_id").alias("order_count"),
sum("amount").alias("total_spent")
) \
.filter(col("total_spent") > 1000) \
.orderBy(col("total_spent").desc())
# Сохранение
result.write.parquet("high_value_customers.parquet")
Правило выбора
def choose_spark_api(data_type, operation_complexity):
if data_type == "unstructured":
return "RDD" # Текст, бинарные данные
elif operation_complexity == "complex_custom_logic":
return "RDD" # Нестандартные функции
elif data_type == "structured" and operation_complexity == "simple":
return "DataFrame" # Таблицы, SQL
else:
return "Dataset" # Scala/Java, типобезопасность
Вывод: для большинства задач Data Engineer используй DataFrame, так как это обеспечивает оптимальный баланс между читаемостью, производительностью и удобством. RDD нужны редко, для специальных случаев.