← Назад к вопросам

Как взаимодействуют Spark, Hadoop и Hive при чтении/записи таблиц через Spark SQL?

2.8 Senior🔥 131 комментариев
#Apache Spark#Hadoop и распределенные системы

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Взаимодействие Spark, Hadoop и Hive в Spark SQL

Spark, Hadoop и Hive — компоненты большого экосистема больших данных, которые работают вместе для обработки и хранения данных в распределённом окружении. Хотя современные версии Spark уменьшили зависимость от Hadoop, их интеграция остаётся важной в enterprise-среде.

Архитектурные слои

┌──────────────────────────────────────┐
│       Пользовательское приложение    │
│          (Spark SQL запросы)         │
└───────────────┬──────────────────────┘
                │
        ┌───────▼────────┐
        │   Spark SQL    │  ← Catalyst optimizer
        │   (3.x, 4.x)   │  ← Parser, planner
        └───────┬────────┘
                │
    ┌───────────┴───────────┐
    │                       │
┌───▼────────┐      ┌──────▼──────┐
│   Spark    │      │    Hive      │
│   Core     │      │  Metastore   │
│ (RDD/DF)   │      │  (metadata)  │
└───┬────────┘      └──────┬───────┘
    │                      │
    │    ┌─────────────────┘
    │    │
┌───▼────▼────────────────────┐
│   Hadoop/HDFS или S3/GCS    │  ← storage layer
│  (файлы Parquet, ORC, CSV)  │
└─────────────────────────────┘

Роли компонентов

1. Hadoop (Distributed Storage & Compute)

Hadoop обеспечивает:

  • HDFS (Hadoop Distributed File System) — распределённая файловая система
  • YARN (Yet Another Resource Negotiator) — управление ресурсами кластера
  • MapReduce — фреймворк для обработки (устаревший, заменён Spark)
# Пример структуры HDFS
/data/warehouse/
  ├─ orders/
  │  ├─ year=2024/month=01/day=01/
  │  │  ├─ part-00000.parquet
  │  │  ├─ part-00001.parquet
  │  │  └─ _SUCCESS
  │  └─ year=2024/month=01/day=02/
  │     └─ part-00000.parquet
  └─ customers/
     └─ part-00000.parquet

# HDFS репликация (default 3x):
# Каждый блок размером 128MB или 256MB
# хранится на 3 разных машинах в кластере

2. Hive (Metadata Management)

Hive Metastore — это база данных (обычно MySQL/PostgreSQL), которая хранит метаданные:

-- Что хранит Hive Metastore
CREATE TABLE orders (
  order_id INT,
  user_id INT,
  amount DECIMAL(10,2),
  created_at TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION '/data/warehouse/orders';

-- В Metastore сохраняется:
-- ├─ Имя таблицы: orders
-- ├─ Список колонок с типами
-- ├─ Партиции: year, month
-- ├─ Формат хранения: Parquet
-- ├─ Путь HDFS: /data/warehouse/orders
-- └─ Статистика (count rows, size, etc)

3. Spark (Execution Engine)

Spark — это distributed processing engine, который:

  • Читает метаданные из Hive Metastore
  • Планирует запросы через Catalyst optimizer
  • Обращается к HDFS/S3 для чтения данных
  • Может писать обратно в Hive таблицы

Как работает запрос через Spark SQL

Сценарий 1: Чтение таблицы Hive через Spark SQL

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("data_analysis") \
    .enableHiveSupport()  # ← Включаем поддержку Hive
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .getOrCreate()

# Запрос
df = spark.sql("""
  SELECT 
    order_id,
    SUM(amount) as total
  FROM orders
  WHERE year = 2024 AND month = 1
  GROUP BY order_id
""")

df.show()

Что происходит внутри:

1. Spark parser: парсит SQL → AST (Abstract Syntax Tree)
   SELECT order_id, SUM(amount) as total FROM orders WHERE ...

2. Spark Analyzer: смотрит в Hive Metastore
   └─ Проверяет, существует ли таблица "orders"
   └─ Получает список колонок и их типы
   └─ Понимает, что таблица партицирована по (year, month)

3. Catalyst Optimizer: оптимизирует запрос
   ├─ Predicate Pushdown: WHERE year=2024 AND month=1 (до чтения)
   ├─ Column Pruning: нужны только order_id и amount
   ├─ Join Order: если JOINы — выбирает оптимальный порядок
   └─ Генерирует Physical Plan

4. Spark Execution:
   ├─ Обращается к HDFS за партициями:
   │  /data/warehouse/orders/year=2024/month=1/day=01/part-*.parquet
   │  /data/warehouse/orders/year=2024/month=1/day=02/part-*.parquet
   │
   ├─ Распределяет чтение по Executors параллельно:
   │  ├─ Executor 1: читает part-00000.parquet
   │  ├─ Executor 2: читает part-00001.parquet
   │  ├─ Executor 3: читает part-00002.parquet
   │  └─ ...
   │
   ├─ На каждом Executor: фильтр WHERE, группировка GROUP BY
   │
   ├─ Shuffling: перемешивание данных по order_id (для GROUP BY)
   │
   └─ Финальная агрегация: SUM(amount) по каждому order_id

Сценарий 2: Запись данных в Hive таблицу

# Создаём DataFrame в Spark
df_processed = spark.sql("""
  SELECT 
    order_id,
    user_id,
    amount,
    YEAR(created_at) as year,
    MONTH(created_at) as month
  FROM raw_orders
  WHERE created_at >= '2024-01-01'
""")

# Пишем в Hive таблицу
df_processed.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .format("parquet") \
    .option("path", "/data/warehouse/processed_orders") \
    .saveAsTable("processed_orders")  # ← Создаёт/обновляет в Hive

Что происходит:

1. Spark выполняет вычисления на разных Executors

2. Для каждой партиции (year, month):
   ├─ Создаёт директорию: /data/warehouse/processed_orders/year=2024/month=1/
   ├─ Пишет part-00000.parquet, part-00001.parquet, ...
   └─ Пишет _SUCCESS файл (сигнал об успехе)

3. Spark обновляет Hive Metastore:
   ├─ Регистрирует таблицу processed_orders
   ├─ Добавляет информацию о партициях (year=2024, month=1, etc)
   └─ Обновляет статистику (количество строк, размер)

4. Таблица доступна для других Spark сессий и Hive запросов

Взаимодействие через Partitioning

Партиции — ключевой механизм оптимизации в Hadoop/Hive/Spark.

# Таблица с партициями
CREATE TABLE sales (
  sale_id INT,
  amount DECIMAL(10,2),
  timestamp TIMESTAMP
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET
LOCATION '/warehouse/sales';

# Физическая структура HDFS:
/warehouse/sales/
  ├─ year=2024/month=01/day=01/
  │  ├─ part-00000.parquet (блоки на DataNode 1, 2, 3)
  │  └─ part-00001.parquet (блоки на DataNode 2, 3, 4)
  ├─ year=2024/month=01/day=02/
  │  └─ part-00000.parquet
  └─ ...

# Spark запрос с partition pruning:
spark.sql("""
  SELECT * FROM sales 
  WHERE year=2024 AND month=1 AND day=1
""")

# Spark НЕ читает year=2024/month=01/day=02/ и позже
# Значительная экономия времени и bandwidth

Кэширование и Locality

Spark и Hadoop работают вместе через HDFS locality:

# HDFS Rack-Aware Replication
# Блок данных (128 MB) реплицируется 3 раза:

┌─────────────────────────────────────────────┐
│            Rack 1                           │
│  ┌─────────────┐  ┌─────────────┐         │
│  │ DataNode 1  │  │ DataNode 2  │         │
│  │  copy of    │  │  copy of    │         │
│  │   block A   │  │   block A   │         │
│  └─────────────┘  └─────────────┘         │
└─────────────────────────────────────────────┘
         Network separator
┌─────────────────────────────────────────────┐
│            Rack 2                           │
│  ┌─────────────┐                           │
│  │ DataNode 3  │                           │
│  │  copy of    │                           │
│  │   block A   │                           │
│  └─────────────┘                           │
└─────────────────────────────────────────────┘

# Spark scheduler размещает Task рядом с данными (data locality):
# ├─ Preferred: запустить Task на DataNode 1 или 2 (один рэк)
# ├─ Fallback: если не доступны, использовать DataNode 3 (другой рэк)
# └─ Last resort: читать через сеть (медленно!)

Современные изменения

В последних версиях Spark (3.0+):

# 1. Spark может работать БЕЗ Hadoop
spark = SparkSession.builder \
    .appName("app") \
    .master("spark://localhost:7077") \
    .getOrCreate()

# 2. Данные могут быть в облаке (S3, GCS), не только HDFS
df = spark.read \
    .parquet("s3://my-bucket/data/orders")

# 3. Hive Metastore может быть в облаке (AWS Glue Catalog)
spark = SparkSession.builder \
    .config("spark.sql.catalog.awsglue", 
            "org.apache.spark.sql.catalyst.catalog.ExternalCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

# 4. Delta Lake или Iceberg вместо Parquet
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")

Практический пример: ETL Pipeline

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, day

spark = SparkSession.builder \
    .appName("etl-pipeline") \
    .enableHiveSupport() \
    .getOrCreate()

# 1. Читаем сырые данные из HDFS
raw_df = spark.read \
    .option("header", "true") \
    .csv("/raw_data/orders/*.csv")

# 2. Трансформируем
processed_df = raw_df \
    .withColumn("year", year(col("order_date"))) \
    .withColumn("month", month(col("order_date"))) \
    .withColumn("day", day(col("order_date"))) \
    .filter(col("amount") > 0)

# 3. Пишем в Hive таблицу (партиционированную)
processed_df.write \
    .mode("append") \
    .partitionBy("year", "month", "day") \
    .format("parquet") \
    .insertInto("orders_processed")

# 4. Другие приложения могут читать таблицу через Hive SQL
# hive> SELECT * FROM orders_processed WHERE year=2024 AND month=1;

Вывод

Spark, Hadoop и Hive работают как единая система:

  • Hadoop HDFS — хранилище данных с репликацией и локальностью
  • Hive Metastore — каталог таблиц с метаданными
  • Spark — движок обработки, оптимизирующий запросы через Catalyst и использующий HDFS для параллельного чтения/записи

Партиционирование, predicate pushdown и data locality — ключевые механизмы эффективного использования экосистемы.

Как взаимодействуют Spark, Hadoop и Hive при чтении/записи таблиц через Spark SQL? | PrepBro