Как взаимодействуют Spark, Hadoop и Hive при чтении/записи таблиц через Spark SQL?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Взаимодействие Spark, Hadoop и Hive в Spark SQL
Spark, Hadoop и Hive — компоненты большого экосистема больших данных, которые работают вместе для обработки и хранения данных в распределённом окружении. Хотя современные версии Spark уменьшили зависимость от Hadoop, их интеграция остаётся важной в enterprise-среде.
Архитектурные слои
┌──────────────────────────────────────┐
│ Пользовательское приложение │
│ (Spark SQL запросы) │
└───────────────┬──────────────────────┘
│
┌───────▼────────┐
│ Spark SQL │ ← Catalyst optimizer
│ (3.x, 4.x) │ ← Parser, planner
└───────┬────────┘
│
┌───────────┴───────────┐
│ │
┌───▼────────┐ ┌──────▼──────┐
│ Spark │ │ Hive │
│ Core │ │ Metastore │
│ (RDD/DF) │ │ (metadata) │
└───┬────────┘ └──────┬───────┘
│ │
│ ┌─────────────────┘
│ │
┌───▼────▼────────────────────┐
│ Hadoop/HDFS или S3/GCS │ ← storage layer
│ (файлы Parquet, ORC, CSV) │
└─────────────────────────────┘
Роли компонентов
1. Hadoop (Distributed Storage & Compute)
Hadoop обеспечивает:
- HDFS (Hadoop Distributed File System) — распределённая файловая система
- YARN (Yet Another Resource Negotiator) — управление ресурсами кластера
- MapReduce — фреймворк для обработки (устаревший, заменён Spark)
# Пример структуры HDFS
/data/warehouse/
├─ orders/
│ ├─ year=2024/month=01/day=01/
│ │ ├─ part-00000.parquet
│ │ ├─ part-00001.parquet
│ │ └─ _SUCCESS
│ └─ year=2024/month=01/day=02/
│ └─ part-00000.parquet
└─ customers/
└─ part-00000.parquet
# HDFS репликация (default 3x):
# Каждый блок размером 128MB или 256MB
# хранится на 3 разных машинах в кластере
2. Hive (Metadata Management)
Hive Metastore — это база данных (обычно MySQL/PostgreSQL), которая хранит метаданные:
-- Что хранит Hive Metastore
CREATE TABLE orders (
order_id INT,
user_id INT,
amount DECIMAL(10,2),
created_at TIMESTAMP
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION '/data/warehouse/orders';
-- В Metastore сохраняется:
-- ├─ Имя таблицы: orders
-- ├─ Список колонок с типами
-- ├─ Партиции: year, month
-- ├─ Формат хранения: Parquet
-- ├─ Путь HDFS: /data/warehouse/orders
-- └─ Статистика (count rows, size, etc)
3. Spark (Execution Engine)
Spark — это distributed processing engine, который:
- Читает метаданные из Hive Metastore
- Планирует запросы через Catalyst optimizer
- Обращается к HDFS/S3 для чтения данных
- Может писать обратно в Hive таблицы
Как работает запрос через Spark SQL
Сценарий 1: Чтение таблицы Hive через Spark SQL
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("data_analysis") \
.enableHiveSupport() # ← Включаем поддержку Hive
.config("hive.metastore.uris", "thrift://hive-metastore:9083") \
.getOrCreate()
# Запрос
df = spark.sql("""
SELECT
order_id,
SUM(amount) as total
FROM orders
WHERE year = 2024 AND month = 1
GROUP BY order_id
""")
df.show()
Что происходит внутри:
1. Spark parser: парсит SQL → AST (Abstract Syntax Tree)
SELECT order_id, SUM(amount) as total FROM orders WHERE ...
2. Spark Analyzer: смотрит в Hive Metastore
└─ Проверяет, существует ли таблица "orders"
└─ Получает список колонок и их типы
└─ Понимает, что таблица партицирована по (year, month)
3. Catalyst Optimizer: оптимизирует запрос
├─ Predicate Pushdown: WHERE year=2024 AND month=1 (до чтения)
├─ Column Pruning: нужны только order_id и amount
├─ Join Order: если JOINы — выбирает оптимальный порядок
└─ Генерирует Physical Plan
4. Spark Execution:
├─ Обращается к HDFS за партициями:
│ /data/warehouse/orders/year=2024/month=1/day=01/part-*.parquet
│ /data/warehouse/orders/year=2024/month=1/day=02/part-*.parquet
│
├─ Распределяет чтение по Executors параллельно:
│ ├─ Executor 1: читает part-00000.parquet
│ ├─ Executor 2: читает part-00001.parquet
│ ├─ Executor 3: читает part-00002.parquet
│ └─ ...
│
├─ На каждом Executor: фильтр WHERE, группировка GROUP BY
│
├─ Shuffling: перемешивание данных по order_id (для GROUP BY)
│
└─ Финальная агрегация: SUM(amount) по каждому order_id
Сценарий 2: Запись данных в Hive таблицу
# Создаём DataFrame в Spark
df_processed = spark.sql("""
SELECT
order_id,
user_id,
amount,
YEAR(created_at) as year,
MONTH(created_at) as month
FROM raw_orders
WHERE created_at >= '2024-01-01'
""")
# Пишем в Hive таблицу
df_processed.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.format("parquet") \
.option("path", "/data/warehouse/processed_orders") \
.saveAsTable("processed_orders") # ← Создаёт/обновляет в Hive
Что происходит:
1. Spark выполняет вычисления на разных Executors
2. Для каждой партиции (year, month):
├─ Создаёт директорию: /data/warehouse/processed_orders/year=2024/month=1/
├─ Пишет part-00000.parquet, part-00001.parquet, ...
└─ Пишет _SUCCESS файл (сигнал об успехе)
3. Spark обновляет Hive Metastore:
├─ Регистрирует таблицу processed_orders
├─ Добавляет информацию о партициях (year=2024, month=1, etc)
└─ Обновляет статистику (количество строк, размер)
4. Таблица доступна для других Spark сессий и Hive запросов
Взаимодействие через Partitioning
Партиции — ключевой механизм оптимизации в Hadoop/Hive/Spark.
# Таблица с партициями
CREATE TABLE sales (
sale_id INT,
amount DECIMAL(10,2),
timestamp TIMESTAMP
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET
LOCATION '/warehouse/sales';
# Физическая структура HDFS:
/warehouse/sales/
├─ year=2024/month=01/day=01/
│ ├─ part-00000.parquet (блоки на DataNode 1, 2, 3)
│ └─ part-00001.parquet (блоки на DataNode 2, 3, 4)
├─ year=2024/month=01/day=02/
│ └─ part-00000.parquet
└─ ...
# Spark запрос с partition pruning:
spark.sql("""
SELECT * FROM sales
WHERE year=2024 AND month=1 AND day=1
""")
# Spark НЕ читает year=2024/month=01/day=02/ и позже
# Значительная экономия времени и bandwidth
Кэширование и Locality
Spark и Hadoop работают вместе через HDFS locality:
# HDFS Rack-Aware Replication
# Блок данных (128 MB) реплицируется 3 раза:
┌─────────────────────────────────────────────┐
│ Rack 1 │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ DataNode 1 │ │ DataNode 2 │ │
│ │ copy of │ │ copy of │ │
│ │ block A │ │ block A │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────┘
Network separator
┌─────────────────────────────────────────────┐
│ Rack 2 │
│ ┌─────────────┐ │
│ │ DataNode 3 │ │
│ │ copy of │ │
│ │ block A │ │
│ └─────────────┘ │
└─────────────────────────────────────────────┘
# Spark scheduler размещает Task рядом с данными (data locality):
# ├─ Preferred: запустить Task на DataNode 1 или 2 (один рэк)
# ├─ Fallback: если не доступны, использовать DataNode 3 (другой рэк)
# └─ Last resort: читать через сеть (медленно!)
Современные изменения
В последних версиях Spark (3.0+):
# 1. Spark может работать БЕЗ Hadoop
spark = SparkSession.builder \
.appName("app") \
.master("spark://localhost:7077") \
.getOrCreate()
# 2. Данные могут быть в облаке (S3, GCS), не только HDFS
df = spark.read \
.parquet("s3://my-bucket/data/orders")
# 3. Hive Metastore может быть в облаке (AWS Glue Catalog)
spark = SparkSession.builder \
.config("spark.sql.catalog.awsglue",
"org.apache.spark.sql.catalyst.catalog.ExternalCatalog") \
.enableHiveSupport() \
.getOrCreate()
# 4. Delta Lake или Iceberg вместо Parquet
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")
Практический пример: ETL Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, day
spark = SparkSession.builder \
.appName("etl-pipeline") \
.enableHiveSupport() \
.getOrCreate()
# 1. Читаем сырые данные из HDFS
raw_df = spark.read \
.option("header", "true") \
.csv("/raw_data/orders/*.csv")
# 2. Трансформируем
processed_df = raw_df \
.withColumn("year", year(col("order_date"))) \
.withColumn("month", month(col("order_date"))) \
.withColumn("day", day(col("order_date"))) \
.filter(col("amount") > 0)
# 3. Пишем в Hive таблицу (партиционированную)
processed_df.write \
.mode("append") \
.partitionBy("year", "month", "day") \
.format("parquet") \
.insertInto("orders_processed")
# 4. Другие приложения могут читать таблицу через Hive SQL
# hive> SELECT * FROM orders_processed WHERE year=2024 AND month=1;
Вывод
Spark, Hadoop и Hive работают как единая система:
- Hadoop HDFS — хранилище данных с репликацией и локальностью
- Hive Metastore — каталог таблиц с метаданными
- Spark — движок обработки, оптимизирующий запросы через Catalyst и использующий HDFS для параллельного чтения/записи
Партиционирование, predicate pushdown и data locality — ключевые механизмы эффективного использования экосистемы.