Как Spark читает и записывает данные?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как Spark читает и записывает данные
Чтение и запись данных — это основные операции в Spark. Понимание того, как Spark работает с данными на диске и в памяти, критично для оптимизации performance и управления ресурсами.
Архитектура ввода-вывода в Spark
Spark использует абстракцию RDD (Resilient Distributed Dataset) и DataFrame для работы с данными. Процесс чтения и записи разделен на несколько уровней:
Программа Spark
↓
DataFrame API / RDD
↓
Catalyst Optimizer
↓
Physical Plan
↓
Task Execution
↓
Input/Output Formats
↓
Файловая система (HDFS/S3/Parquet/JSON/CSV и т.д.)
Чтение данных в Spark
1. Основной способ чтения
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataRead").getOrCreate()
# Чтение CSV файла
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/path/to/data.csv")
# Чтение Parquet файла (рекомендуется для больших данных)
df = spark.read.parquet("/path/to/data.parquet")
# Чтение JSON
df = spark.read.json("/path/to/data.json")
# Чтение из базы данных
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="users",
properties={"user": "postgres", "password": "pwd"}
)
2. Процесс чтения внутри
Когда Spark читает данные, происходит следующее:
Шаг 1: Обнаружение файлов (File Discovery)
- Spark сканирует директорию и находит все файлы
- Группирует файлы по разделам (partitions)
- Вычисляет количество разделов на основе размера файла
# Контроль количества разделов
df = spark.read \
.option("header", "true") \
.csv("/path/to/large_file.csv") \
.repartition(100) # Используй 100 партиций
Шаг 2: Создание Task'ов
- Для каждой партиции создается один Task
- Task будет выполняться на одном executor'е
- Spark пытается разместить Task на машине, где хранятся данные (data locality)
Шаг 3: Чтение блоков данных
- Каждый Task читает свой блок данных из файловой системы
- Данные загружаются в оперативную память executor'а
- Если файл больше доступной памяти, используется spill на диск
Шаг 4: Парсинг и преобразование
- Данные парсятся в соответствии с форматом файла
- Применяются фильтры и проекции (если указаны)
- Результат остается в памяти для дальнейших операций
3. Оптимизация чтения
# Использование predicate pushdown (фильтры на уровне чтения)
df = spark.read.parquet("/path/to/data") \
.filter(df.age > 30) # Фильтр применится ДО полного чтения
# Использование column pruning (чтение только нужных колонок)
df = spark.read.parquet("/path/to/data") \
.select(["name", "age"]) # Читает только эти колонки
# Использование partitioned файлов
df = spark.read.parquet("/path/to/data/year=2023/month=01") \
.filter((F.col("year") == 2023) & (F.col("month") == 1))
Запись данных из Spark
1. Основные режимы записи
# Режим SaveMode определяет поведение при существовании файла
# 1. Overwrite - перезаписать существующие данные
df.write.mode("overwrite").parquet("/path/to/output")
# 2. Append - добавить к существующим данным
df.write.mode("append").parquet("/path/to/output")
# 3. Ignore - ничего не делать если файл существует
df.write.mode("ignore").parquet("/path/to/output")
# 4. Error (default) - выбросить ошибку если файл существует
df.write.mode("error").parquet("/path/to/output")
2. Форматы записи
# Parquet - рекомендуемый формат для больших данных
# Сжимается хорошо, быстро читается, поддерживает partitioning
df.write.parquet("/output/data.parquet")
# CSV
df.write.option("header", "true").csv("/output/data.csv")
# JSON
df.write.json("/output/data.json")
# Partitioned Parquet - разделение по столбцам
df.write.partitionBy("year", "month").parquet("/output/data")
# Результат: /output/data/year=2023/month=01/..., /output/data/year=2023/month=02/...
# Bucketing - сортировка в бакеты для оптимизации joins
df.write.bucketBy(100, "user_id").mode("overwrite").parquet("/output/data")
3. Процесс записи внутри
Шаг 1: Shuffle фаза (если требуется)
- Если используется partitionBy или groupBy, происходит shuffle
- Данные перераспределяются между partition'ами
- Пишутся в промежуточные файлы на диск
Шаг 2: Writing фаза
- Каждый partition пишется в отдельный файл
- По умолчанию Spark пишет число файлов = число partition'ов
- Каждый executor пишет параллельно
# Контроль количества выходных файлов
df.coalesce(1).write.parquet("/output/data") # 1 файл (медленно)
df.repartition(4).write.parquet("/output/data") # 4 файла
Шаг 3: Commit фаза
- Все временные файлы переименовываются в окончательные
- Записывается файл успеха (_SUCCESS)
- Данные становятся видимыми для следующих операций
4. Оптимизация записи
# Избегание многих маленьких файлов
df = df.repartition(10) # Уменьшить число файлов
df.write.mode("overwrite").parquet("/output/data")
# Сжатие данных
df.write \
.option("compression", "snappy") \
.parquet("/output/data")
# Использование правильного формата
# Parquet < CSV < JSON по размеру и скорости
# Партиционирование для улучшения скорости чтения
df.write \
.partitionBy("date") \
.parquet("/output/data")
Специализированные источники данных
Чтение/запись из SQL базы данных
# Чтение
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "(SELECT * FROM users WHERE age > 30) AS filtered") \
.option("user", "postgres") \
.option("password", "password") \
.option("numPartitions", 10) \
.load()
# Запись
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "output_table") \
.option("user", "postgres") \
.option("password", "password") \
.mode("overwrite") \
.save()
Чтение/запись из S3
# Требует настройки AWS credentials
df = spark.read.parquet("s3a://my-bucket/path/to/data")
df.write.parquet("s3a://my-bucket/output/path")
Обработка больших файлов
# Для файлов, которые не помещаются в памяти
df = spark.read.parquet("/path/to/large/file") \
.repartition(1000) # Увеличить партиции
# Использование streaming для очень больших файлов
dfStream = spark.readStream \
.format("csv") \
.option("header", "true") \
.load("/path/to/streaming/data")
Ключевые концепции
Data Locality: Spark пытается выполнять код на машинах, где хранятся данные, чтобы минимизировать сетевой трафик.
Lazy Evaluation: Spark не читает данные пока не встретит action (collect, save, show и т.д.)
Catalyst Optimizer: Spark оптимизирует план выполнения, применяя predicate pushdown и column pruning.
Partitioning: Разделение данных критично для параллелизма. Оптимальное число партиций зависит от размера данных и ресурсов.
Лучшие практики
- Используйте Parquet вместо CSV для больших данных
- Партиционируйте данные по датам или другим часто используемым столбцам
- Используйте predicate pushdown (фильтры) перед прочтением
- Избегайте coalesce(1) в production (создает один большой файл)
- Мониторьте количество файлов на выходе
- Используйте кэширование для многократного использования данных
Понимание этих механизмов критично для написания эффективных Spark приложений.