← Назад к вопросам

Как Spark читает и записывает данные?

1.8 Middle🔥 171 комментариев
#Apache Spark#Форматы данных и хранение

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как Spark читает и записывает данные

Чтение и запись данных — это основные операции в Spark. Понимание того, как Spark работает с данными на диске и в памяти, критично для оптимизации performance и управления ресурсами.

Архитектура ввода-вывода в Spark

Spark использует абстракцию RDD (Resilient Distributed Dataset) и DataFrame для работы с данными. Процесс чтения и записи разделен на несколько уровней:

Программа Spark
    ↓
DataFrame API / RDD
    ↓
Catalyst Optimizer
    ↓
Physical Plan
    ↓
Task Execution
    ↓
Input/Output Formats
    ↓
Файловая система (HDFS/S3/Parquet/JSON/CSV и т.д.)

Чтение данных в Spark

1. Основной способ чтения

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataRead").getOrCreate()

# Чтение CSV файла
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/path/to/data.csv")

# Чтение Parquet файла (рекомендуется для больших данных)
df = spark.read.parquet("/path/to/data.parquet")

# Чтение JSON
df = spark.read.json("/path/to/data.json")

# Чтение из базы данных
df = spark.read.jdbc(
    url="jdbc:postgresql://localhost:5432/mydb",
    table="users",
    properties={"user": "postgres", "password": "pwd"}
)

2. Процесс чтения внутри

Когда Spark читает данные, происходит следующее:

Шаг 1: Обнаружение файлов (File Discovery)

  • Spark сканирует директорию и находит все файлы
  • Группирует файлы по разделам (partitions)
  • Вычисляет количество разделов на основе размера файла
# Контроль количества разделов
df = spark.read \
    .option("header", "true") \
    .csv("/path/to/large_file.csv") \
    .repartition(100)  # Используй 100 партиций

Шаг 2: Создание Task'ов

  • Для каждой партиции создается один Task
  • Task будет выполняться на одном executor'е
  • Spark пытается разместить Task на машине, где хранятся данные (data locality)

Шаг 3: Чтение блоков данных

  • Каждый Task читает свой блок данных из файловой системы
  • Данные загружаются в оперативную память executor'а
  • Если файл больше доступной памяти, используется spill на диск

Шаг 4: Парсинг и преобразование

  • Данные парсятся в соответствии с форматом файла
  • Применяются фильтры и проекции (если указаны)
  • Результат остается в памяти для дальнейших операций

3. Оптимизация чтения

# Использование predicate pushdown (фильтры на уровне чтения)
df = spark.read.parquet("/path/to/data") \
    .filter(df.age > 30)  # Фильтр применится ДО полного чтения

# Использование column pruning (чтение только нужных колонок)
df = spark.read.parquet("/path/to/data") \
    .select(["name", "age"])  # Читает только эти колонки

# Использование partitioned файлов
df = spark.read.parquet("/path/to/data/year=2023/month=01") \
    .filter((F.col("year") == 2023) & (F.col("month") == 1))

Запись данных из Spark

1. Основные режимы записи

# Режим SaveMode определяет поведение при существовании файла

# 1. Overwrite - перезаписать существующие данные
df.write.mode("overwrite").parquet("/path/to/output")

# 2. Append - добавить к существующим данным
df.write.mode("append").parquet("/path/to/output")

# 3. Ignore - ничего не делать если файл существует
df.write.mode("ignore").parquet("/path/to/output")

# 4. Error (default) - выбросить ошибку если файл существует
df.write.mode("error").parquet("/path/to/output")

2. Форматы записи

# Parquet - рекомендуемый формат для больших данных
# Сжимается хорошо, быстро читается, поддерживает partitioning
df.write.parquet("/output/data.parquet")

# CSV
df.write.option("header", "true").csv("/output/data.csv")

# JSON
df.write.json("/output/data.json")

# Partitioned Parquet - разделение по столбцам
df.write.partitionBy("year", "month").parquet("/output/data")
# Результат: /output/data/year=2023/month=01/..., /output/data/year=2023/month=02/...

# Bucketing - сортировка в бакеты для оптимизации joins
df.write.bucketBy(100, "user_id").mode("overwrite").parquet("/output/data")

3. Процесс записи внутри

Шаг 1: Shuffle фаза (если требуется)

  • Если используется partitionBy или groupBy, происходит shuffle
  • Данные перераспределяются между partition'ами
  • Пишутся в промежуточные файлы на диск

Шаг 2: Writing фаза

  • Каждый partition пишется в отдельный файл
  • По умолчанию Spark пишет число файлов = число partition'ов
  • Каждый executor пишет параллельно
# Контроль количества выходных файлов
df.coalesce(1).write.parquet("/output/data")  # 1 файл (медленно)
df.repartition(4).write.parquet("/output/data")  # 4 файла

Шаг 3: Commit фаза

  • Все временные файлы переименовываются в окончательные
  • Записывается файл успеха (_SUCCESS)
  • Данные становятся видимыми для следующих операций

4. Оптимизация записи

# Избегание многих маленьких файлов
df = df.repartition(10)  # Уменьшить число файлов
df.write.mode("overwrite").parquet("/output/data")

# Сжатие данных
df.write \
    .option("compression", "snappy") \
    .parquet("/output/data")

# Использование правильного формата
# Parquet < CSV < JSON по размеру и скорости

# Партиционирование для улучшения скорости чтения
df.write \
    .partitionBy("date") \
    .parquet("/output/data")

Специализированные источники данных

Чтение/запись из SQL базы данных

# Чтение
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "(SELECT * FROM users WHERE age > 30) AS filtered") \
    .option("user", "postgres") \
    .option("password", "password") \
    .option("numPartitions", 10) \
    .load()

# Запись
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "output_table") \
    .option("user", "postgres") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()

Чтение/запись из S3

# Требует настройки AWS credentials
df = spark.read.parquet("s3a://my-bucket/path/to/data")

df.write.parquet("s3a://my-bucket/output/path")

Обработка больших файлов

# Для файлов, которые не помещаются в памяти
df = spark.read.parquet("/path/to/large/file") \
    .repartition(1000)  # Увеличить партиции

# Использование streaming для очень больших файлов
dfStream = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .load("/path/to/streaming/data")

Ключевые концепции

Data Locality: Spark пытается выполнять код на машинах, где хранятся данные, чтобы минимизировать сетевой трафик.

Lazy Evaluation: Spark не читает данные пока не встретит action (collect, save, show и т.д.)

Catalyst Optimizer: Spark оптимизирует план выполнения, применяя predicate pushdown и column pruning.

Partitioning: Разделение данных критично для параллелизма. Оптимальное число партиций зависит от размера данных и ресурсов.

Лучшие практики

  • Используйте Parquet вместо CSV для больших данных
  • Партиционируйте данные по датам или другим часто используемым столбцам
  • Используйте predicate pushdown (фильтры) перед прочтением
  • Избегайте coalesce(1) в production (создает один большой файл)
  • Мониторьте количество файлов на выходе
  • Используйте кэширование для многократного использования данных

Понимание этих механизмов критично для написания эффективных Spark приложений.