В чем разница между Delta Format и Parquet?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Delta Format vs Parquet
Два популярных формата для хранения данных в data lake'ах, но они решают разные проблемы.
Parquet — столбцовый формат данных
Что это: Open-source формат для хранения структурированных данных, оптимизированный для аналитических запросов.
Структура:
parquet_file.parquet
├── Row Group 1
│ ├── Column: user_id (compressed)
│ ├── Column: name (compressed)
│ ├── Column: amount (compressed)
│ └── Footer (metadata)
├── Row Group 2
│ ├── Column: user_id
│ ├── Column: name
│ └── Column: amount
└── Metadata (schema, stats)
Использование:
import pandas as pd
from pyspark.sql import SparkSession
# Написание Parquet
df = pd.read_csv('data.csv')
df.to_parquet('data.parquet', compression='snappy')
# Чтение Parquet
df = pd.read_parquet('data.parquet')
# В Spark
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('/data/transactions/*.parquet')
# Partition pruning — читает только нужные колонки
df_filtered = df.select('user_id', 'amount').where(df.user_id == 123)
Достоинства:
- Сжатие: 10-20x лучше чем CSV/JSON (особенно с Snappy)
- Selective column reading: Читаешь только нужные колонки
- Predicate pushdown: WHERE условия применяются при чтении
- Universal: Поддерживается везде (Spark, Pandas, Arrow, DuckDB)
- Schema preservation: Структура данных сохранена
- Statistics: Metadata о min/max значениях для оптимизации
Недостатки:
- Immutable: Нельзя изменить существующий файл
- Append-only: Можешь только добавлять новые файлы
- No ACID: Нет гарантий consistency
- Slow updates: Чтобы обновить одну строку, переписываешь весь файл
- Data lakes chaos: Много маленьких файлов от incremental jobs
# Проблема: Обновление в Parquet
# Исходный файл
data = [{"id": 1, "value": 100},
{"id": 2, "value": 200}]
# Хочу обновить id=1 на value=150
# Решение: читаешь весь файл, меняешь, переписываешь
Delta Format — Parquet + Transaction Log
Что это: Линейка (Databricks) поверх Parquet с ACID гарантиями, DML операциями и версионированием.
Архитектура:
delta_table/
├── _delta_log/
│ ├── 000000.json # Transaction log: Add file1.parquet
│ ├── 000001.json # Transaction log: Add file2.parquet, Remove file1.parquet
│ ├── 000002.json # Transaction log: Update (version 2)
│ └── _last_checkpoint # Pointer to latest snapshot
├── file1.parquet # Actual data
├── file2.parquet
└── file3.parquet
Использование:
from delta import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder\
.appName("DeltaDemo")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
# Написание Delta
df = spark.read.csv('data.csv', header=True)
df.write.format("delta").mode("overwrite").save("/data/delta_table")
# Чтение Delta
df = spark.read.format("delta").load("/data/delta_table")
# UPDATE — изменяй данные
delta_table = DeltaTable.forPath(spark, "/data/delta_table")
delta_table.update(
condition=col("user_id") == 123,
set={"amount": col("amount") * 1.1}
)
# DELETE — удаляй данные
delta_table.delete(condition=col("status") == "cancelled")
# MERGE — upsert операции
from pyspark.sql.functions import when
new_data = spark.read.csv('updates.csv', header=True)
delta_table.merge(
new_data.alias("updates"),
"delta_table.user_id = updates.user_id"
).whenMatchedUpdate(
set={"amount": col("updates.amount")}
).whenNotMatchedInsert(
values={
"user_id": col("updates.user_id"),
"amount": col("updates.amount")
}
).execute()
# Time travel — посмотри данные на момент времени
df_old = spark.read.format("delta").option("versionAsOf", 0).load("/data/delta_table")
df_old2 = spark.read.format("delta").option("timestampAsOf", "2025-01-15").load("/data/delta_table")
Достоинства:
- ACID гарантии: Atomicity, Consistency, Isolation, Durability
- DML операции: UPDATE, DELETE, MERGE (как в БД)
- Time travel: Посмотреть данные на любой момент времени
- MERGE: Эффективный upsert (не переписываешь всё)
- Schema evolution: Добавлять/удалять колонки без переписи
- Data versioning: Контроль версий таблицы
- Multiversion concurrency control: Несколько читателей одновременно
- Constraint enforcement: Check constraints, not null
# Delta Schema evolution
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Исходная таблица
df1 = spark.createDataFrame(
[(1, "John")],
["id", "name"]
).write.format("delta").mode("overwrite").save("/data/table")
# Добавляешь новую колонку
df2 = spark.createDataFrame(
[(2, "Jane", "jane@example.com")],
["id", "name", "email"]
).write.format("delta").mode("append").option("mergeSchema", "true").save("/data/table")
# Таблица содержит обе версии! Delta handles NULL для старых rows
Недостатки:
- Complexity: Требует Databricks Spark (не everywhere)
- Storage overhead: Transaction log может расти
- Performance: MERGE операции медленнее чем batch write
- Cost: Databricks лицензирование (Lakehouse Edition)
Сравнительная таблица
| Параметр | Parquet | Delta |
|---|---|---|
| Формат данных | Столбцовый | Parquet + TX Log |
| ACID | ❌ Нет | ✅ Да |
| UPDATE/DELETE | ❌ Нет | ✅ Есть |
| MERGE (upsert) | ❌ Нет | ✅ Есть |
| Time Travel | ❌ Нет | ✅ Есть |
| Сжатие | Отличное | Отличное (+ log) |
| Universal support | ✅ Везде | ⚠️ Spark/Databricks |
| Schema evolution | Ручная | ✅ Автоматическая |
| Версионирование | Нет | ✅ Встроено |
| Latency | Low | Low |
| Complexity | Низкая | Средняя |
Real-world сценарии
Использование Parquet:
# Append-only логи / event streams
# Нет обновлений, только добавления
for day in range(1, 32):
events = spark.read.kafka(...)\
.filter(col("date") == day)\
.write.parquet(f"/data/events/2025-01-{day:02d}")
# Аналитика: читаешь нужные дни
df = spark.read.parquet("/data/events/2025-01-*").where(col("user_id") == 123)
Использование Delta:
# Data warehouse с обновлениями и уделениями
# Fact таблица может изменяться
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/warehouse/orders")
# Статус заказа изменился
delta_table.update(
condition=col("order_id") == 12345,
set={"status": "shipped", "updated_at": current_timestamp()}
)
# Отменённый заказ — удалить
delta_table.delete(condition=col("order_id") == 54321)
# Новые заказы — merge
new_orders = spark.read.json("s3://inbox/orders.json")
delta_table.merge(
new_orders,
"delta_table.order_id = new_orders.order_id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
Мой совет
Выбирай Parquet когда:
- Data lake с append-only событиями
- Нет обновлений / удалений
- Нужна universal совместимость
- Бюджет ограничен (no Databricks license)
- Простота важнее
Выбирай Delta когда:
- Data warehouse с ACID требованиями
- Нужны UPDATE / DELETE / MERGE
- Time travel и версионирование важны
- Используешь Databricks/Spark экосистему
- Качество данных критично (schema evolution)
Гибридный подход
Raw data (Parquet)
↓ (Spark batch, daily)
Cleaned/Deduped (Delta)
↓ (schema validated)
Data Warehouse (Delta + SQL)
↓
BI / Analytics
Итог
Parquet = простой, универсальный, append-only формат для озёр данных Delta = Parquet + гарантии БД (ACID, DML, versioning)
Для modern data stack: начини с Parquet (простота) → переходи на Delta (когда нужны обновления).