← Назад к вопросам

Что такое Spark RDD, DataFrames и Datasets? В чём различия и когда использовать каждый?

1.7 Middle🔥 201 комментариев
#Apache Spark

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Spark RDD, DataFrames и Datasets

RDD, DataFrames и Datasets — это три основных API для работы с данными в Apache Spark, каждый с разным уровнем абстракции и оптимизации.

1. RDD (Resilient Distributed Dataset)

RDD — базовая абстракция Spark, представляет неизменяемое распределённое собрание объектов, которое можно обрабатывать параллельно.

from pyspark import SparkContext

sc = SparkContext("local", "RDD Example")

# Создание RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Трансформации (ленивые)
rdd_doubled = rdd.map(lambda x: x * 2)  # [2, 4, 6, 8, 10]
rdd_filtered = rdd_doubled.filter(lambda x: x > 5)  # [6, 8, 10]

# Действия (жадные)
result = rdd_filtered.collect()  # [6, 8, 10]

# Работа с ключ-значение (key-value RDD)
pairs = rdd.map(lambda x: (x % 2, x))  # [(1,1), (0,2), (1,3), (0,4), (1,5)]
by_key = pairs.reduceByKey(lambda x, y: x + y)  # [(1, 9), (0, 6)]

Характеристики RDD:

  • Низкоуровневый API — полный контроль
  • Неструктурированные данные — обычный Python/Java/Scala объекты
  • Медленнее — нет автоматической оптимизации
  • Гибкость — сложная логика

Когда использовать RDD:

  • Неструктурированные данные (текст, бинарные)
  • Сложные функции, не поддерживающиеся DataFrame
  • Legacy код

2. DataFrames

DataFrame — структурированная таблица с именованными колонками и типами. Оптимизирован Catalyst optimizer.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

# Создание из Python list
data = [("Alice", 25, "HR"), ("Bob", 30, "IT"), ("Charlie", 35, "IT")]
df = spark.createDataFrame(data, ["name", "age", "department"])

# Из файла
df_csv = spark.read.csv("employees.csv", header=True)
df_parquet = spark.read.parquet("data.parquet")

# Операции (оптимизированные)
df_filtered = df.filter(df.age > 25)  # Catalyst оптимизирует
df_grouped = df.groupBy("department").agg({"age": "avg"})
df_joined = df.join(other_df, "department")

# SQL запрос
df.createOrReplaceTempView("employees")
result = spark.sql("SELECT name, age FROM employees WHERE age > 25")

# Вывод
result.show()
result.explain()  # План выполнения

Характеристики DataFrame:

  • Структурированные данные — таблицы, SQL
  • Оптимизирован — Catalyst optimizer, Tungsten
  • Интуитивно — похож на Pandas
  • Производительный — обычно лучше RDD

Когда использовать DataFrame:

  • Структурированные данные (таблицы, CSV, Parquet)
  • Стандартные операции (filter, join, aggregate)
  • Когда важна производительность

3. Datasets

Dataset — типизированный DataFrame. Сочетает типобезопасность RDD с оптимизацией DataFrame. Доступны в Scala/Java.

// Scala Dataset (Python не поддерживает полностью)
case class Employee(name: String, age: Int, department: String)

val ds = Seq(
  Employee("Alice", 25, "HR"),
  Employee("Bob", 30, "IT")
).toDS()

// Типизированные трансформации
ds.filter(_.age > 25)
  .map(e => (e.name, e.age))
  .show()

// Комбинация DataFrame и Dataset
val df = ds.toDF()  // Dataset -> DataFrame
val ds2 = df.as[Employee]  // DataFrame -> Dataset
# Python Dataset эмуляция через DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("department", StringType(), True)
])

df_typed = spark.createDataFrame(data, schema=schema)

Характеристики Dataset:

  • Типизирован — compile-time type safety
  • Оптимизирован — как DataFrame
  • Scala/Java — Python не поддерживает полностью
  • Гибкий — объектно-ориентированный API

Сравнительная таблица

ХарактеристикаRDDDataFrameDataset
АбстракцияLow-levelStructuredType-safe
ОптимизацияНетCatalystCatalyst + Type
СкоростьМедленнаяБыстраяБыстрая
ТипизацияНетРантаймCompile-time
SQLНетДаДа
Сложные функцииДаСреднеСредне
Типичный случайТекстТаблицыScala/Java

Производительность

import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.getOrCreate()

# Создаем большой набор данных
data = [(i, i % 10, i * 2) for i in range(1_000_000)]

# 1. RDD подход (медленный)
start = time.time()
rdd = sc.parallelize(data)
result_rdd = rdd.map(lambda x: (x[1], x[2])) \
                  .reduceByKey(lambda x, y: x + y) \
                  .collect()
rdd_time = time.time() - start
print(f"RDD: {rdd_time:.2f}s")

# 2. DataFrame подход (быстрый)
start = time.time()
df = spark.createDataFrame(data, ["id", "key", "value"])
result_df = df.groupBy("key").agg(sum("value")).collect()
df_time = time.time() - start
print(f"DataFrame: {df_time:.2f}s")

# Результат: DataFrame ~5x быстрее

Конвертации

# RDD -> DataFrame
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, ["id", "name"])

# DataFrame -> RDD
rdd_back = df.rdd  # RDD[Row]

# Извлечение значений
values = rdd_back.map(lambda row: (row.id, row.name)).collect()

Практический пример

from pyspark.sql import SparkSession
from pyspark.sql.functions import *, when

spark = SparkSession.builder.appName("ETL").getOrCreate()

# Загрузка (DataFrame — лучший выбор)
orders = spark.read.parquet("orders.parquet")
customers = spark.read.parquet("customers.parquet")

# Трансформации (DataFrame API оптимизирован)
orders_filtered = orders.filter(orders.status == "completed")

result = orders_filtered \
    .join(customers, "customer_id") \
    .groupBy("customer_id", "name") \
    .agg(
        count("order_id").alias("order_count"),
        sum("amount").alias("total_spent")
    ) \
    .filter(col("total_spent") > 1000) \
    .orderBy(col("total_spent").desc())

# Сохранение
result.write.parquet("high_value_customers.parquet")

Правило выбора

def choose_spark_api(data_type, operation_complexity):
    if data_type == "unstructured":
        return "RDD"  # Текст, бинарные данные
    elif operation_complexity == "complex_custom_logic":
        return "RDD"  # Нестандартные функции
    elif data_type == "structured" and operation_complexity == "simple":
        return "DataFrame"  # Таблицы, SQL
    else:
        return "Dataset"  # Scala/Java, типобезопасность

Вывод: для большинства задач Data Engineer используй DataFrame, так как это обеспечивает оптимальный баланс между читаемостью, производительностью и удобством. RDD нужны редко, для специальных случаев.