Как работают инкрементальные загрузки в S3?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Инкрементальные загрузки в S3: архитектура и реализация
Инкрементальные загрузки — это критический паттерн для работы с большими объёмами данных. Вместо полной перезагрузки таблицы я загружаю только изменённые данные, что снижает затраты и время обработки.
Принцип работы инкрементальной загрузки
Вместо того чтобы каждый раз загружать всю таблицу (1GB), я загружаю только новые/изменённые записи (10-50MB). Это достигается отслеживанием меток времени или последовательностей.
Подход 1: Использование временных меток (Timestamp-based)
Идея: загрузить только записи, изменённые после последней загрузки
import pandas as pd
from datetime import datetime, timedelta
import boto3
import pyarrow.parquet as pq
# Инициализируем S3
s3_client = boto3.client('s3')
bucket = 'my-data-lake'
def get_last_load_timestamp(s3_path):
"""
Получить дату последней успешной загрузки
"""
try:
obj = s3_client.head_object(
Bucket=bucket,
Key=f"{s3_path}/_last_load_timestamp"
)
# Дата из metadata объекта
return pd.to_datetime(obj['LastModified'])
except:
# Если файла нет, это первая загрузка
return datetime(1970, 1, 1)
def incremental_load_timestamp_based(source_db, table_name, s3_path):
"""
Инкрементальная загрузка на основе timestamp
"""
# 1. Получить дату последней загрузки
last_load = get_last_load_timestamp(s3_path)
# 2. Запрос только новых/изменённых записей
query = f"""
SELECT * FROM {table_name}
WHERE updated_at > '{last_load.isoformat()}'
ORDER BY updated_at
"""
df = pd.read_sql(query, source_db)
print(f"Loaded {len(df)} rows updated after {last_load}")
if len(df) > 0:
# 3. Сохранить в S3 партициям
df['load_date'] = datetime.now().date()
s3_full_path = f"s3://{bucket}/{s3_path}/"
wr.s3.to_parquet(
df,
path=s3_full_path,
dataset=True,
partition_cols=['load_date'],
mode='append' # Добавить к существующим данным
)
# 4. Обновить метаданные последней загрузки
s3_client.put_object(
Bucket=bucket,
Key=f"{s3_path}/_last_load_timestamp",
Body=str(datetime.now())
)
print(f"Incremental load completed. {len(df)} rows saved.")
else:
print("No new data to load")
Подход 2: Change Data Capture (CDC) с LSN/Sequence
Идея: использовать логи изменений БД (Log Sequence Numbers)
Этот подход эффективнее для систем, которые отслеживают изменения:
def incremental_load_with_lsn(source_db, table_name, s3_path):
"""
Загрузка через Log Sequence Number (SQL Server / PostgreSQL)
"""
# 1. Получить последний обработанный LSN
try:
with open('/tmp/last_lsn.txt', 'r') as f:
last_lsn = int(f.read())
except:
last_lsn = 0
# 2. Получить изменённые записи
# Для SQL Server используем CDC (Change Data Capture)
query = f"""
SELECT
[cdc].[__$operation], -- 1=delete, 2=insert, 3=before update, 4=after update
[cdc].[__$seqval],
*
FROM cdc.fn_cdc_get_all_changes_{table_name}(
@from_lsn=sys.fn_cdc_increment_lsn({last_lsn}),
@to_lsn=sys.fn_cdc_get_max_lsn(),
'all'
)
"""
df = pd.read_sql(query, source_db)
# 3. Обработать разные типы операций
deletes = df[df['__$operation'] == 1] # Удаления
inserts = df[df['__$operation'] == 2] # Вставки
updates = df[df['__$operation'].isin([3, 4])] # Обновления
# 4. Сохранить в S3
import awswrangler as wr
if len(df) > 0:
wr.s3.to_parquet(
df[['__$operation', '__$seqval'] + [c for c in df.columns if not c.startswith('__')]],
path=f"s3://{bucket}/{s3_path}/",
dataset=True,
partition_cols=['load_date'],
mode='append'
)
# 5. Сохранить новый LSN
new_lsn = df['__$seqval'].max()
with open('/tmp/last_lsn.txt', 'w') as f:
f.write(str(new_lsn))
Подход 3: Full Load vs Incremental Merge
SCD Type 2 (Slowly Changing Dimensions) — сохранение истории:
def scd_type2_merge(new_data, s3_path):
"""
Слияние инкрементальных данных с сохранением истории
SCD Type 2: добавляем valid_from/valid_to даты
"""
import awswrangler as wr
# 1. Читаем существующие данные
try:
existing = wr.s3.read_parquet(f"s3://{bucket}/{s3_path}/")
except:
existing = pd.DataFrame()
# 2. Находим изменённые записи
if len(existing) > 0:
# Merge по key
merged = pd.merge(
existing,
new_data,
on=['id'],
how='outer',
indicator=True,
suffixes=('_old', '_new')
)
# Найти обновленные записи
changed = merged[
(merged['_merge'] == 'both') &
(merged['name_old'] != merged['name_new']) # Как пример
]
if len(changed) > 0:
# Закрыть старые версии
existing.loc[existing['id'].isin(changed['id']), 'valid_to'] = datetime.now()
# Добавить новые версии
new_data['valid_from'] = datetime.now()
new_data['valid_to'] = None
else:
# Первая загрузка
new_data['valid_from'] = datetime.now()
new_data['valid_to'] = None
# 3. Сохранить обновленный датасет
combined = pd.concat([existing, new_data], ignore_index=True)
wr.s3.to_parquet(
combined,
path=f"s3://{bucket}/{s3_path}/",
dataset=True,
mode='overwrite'
)
Подход 4: Дельта-озеро (Delta Lake)
Лучший современный подход — использование Delta Lake для ACID транзакций:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DeltaLoad").getOrCreate()
def incremental_load_delta(source_db, table_name, delta_path):
"""
Инкрементальная загрузка с Delta Lake
"""
# 1. Читаем новые данные
new_df = spark.read.jdbc(source_db, table_name)
# 2. Merge with existing Delta table
delta_table = DeltaTable.forPath(spark, delta_path)
delta_table.alias("existing").merge(
new_df.alias("new"),
"existing.id = new.id"
).whenMatchedUpdateAll(
condition="existing.updated_at < new.updated_at" # Обновить, если новые данные новее
).whenNotMatchedInsertAll().execute()
# 3. Автоматически сохраняется версионирование
# Можно откатиться к предыдущей версии если нужно
SQL подход для инкрементального обновления
-- Загрузка инкрементальных данных в промежуточную таблицу
CREATE TABLE staging_customers AS
SELECT
id,
name,
email,
updated_at,
CURRENT_TIMESTAMP as load_timestamp
FROM source_database.customers
WHERE updated_at > (SELECT MAX(updated_at) FROM customers);
-- UPSERT операция
MERGE INTO customers t
USING staging_customers s
ON t.id = s.id
WHEN MATCHED AND t.updated_at < s.updated_at THEN
UPDATE SET
name = s.name,
email = s.email,
updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT (id, name, email, updated_at, load_timestamp)
VALUES (s.id, s.name, s.email, s.updated_at, s.load_timestamp);
Best Practices для S3 инкрементальных загрузок
1. Используйте партиционирование
s3://bucket/table_name/
├── year=2024/month=01/
├── year=2024/month=02/
└── year=2024/month=03/
2. Храните метаданные последней загрузки
metadata = {
'last_sync_timestamp': datetime.now().isoformat(),
'records_loaded': 1000,
'load_duration_seconds': 45,
'status': 'success'
}
s3_client.put_object(
Bucket=bucket,
Key='metadata.json',
Body=json.dumps(metadata)
)
3. Реализуйте идемпотентность
# Один и тот же load дважды не должен создавать дубликаты
# Используйте уникальные ключи: (id, load_timestamp)
4. Мониторируйте качество
# Алерт, если инкрементальная загрузка не произошла 24 часа
if datetime.now() - last_load > timedelta(days=1):
send_alert("No incremental load for 24 hours!")
Инкрементальные загрузки критичны для масштабируемых data pipelines, позволяя обрабатывать петабайты данных экономно и быстро.