← Назад к вопросам

Как работают инкрементальные загрузки в S3?

1.8 Middle🔥 181 комментариев
#ETL и качество данных#Облачные платформы#Форматы данных и хранение

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Инкрементальные загрузки в S3: архитектура и реализация

Инкрементальные загрузки — это критический паттерн для работы с большими объёмами данных. Вместо полной перезагрузки таблицы я загружаю только изменённые данные, что снижает затраты и время обработки.

Принцип работы инкрементальной загрузки

Вместо того чтобы каждый раз загружать всю таблицу (1GB), я загружаю только новые/изменённые записи (10-50MB). Это достигается отслеживанием меток времени или последовательностей.

Подход 1: Использование временных меток (Timestamp-based)

Идея: загрузить только записи, изменённые после последней загрузки

import pandas as pd
from datetime import datetime, timedelta
import boto3
import pyarrow.parquet as pq

# Инициализируем S3
s3_client = boto3.client('s3')
bucket = 'my-data-lake'

def get_last_load_timestamp(s3_path):
    """
    Получить дату последней успешной загрузки
    """
    try:
        obj = s3_client.head_object(
            Bucket=bucket,
            Key=f"{s3_path}/_last_load_timestamp"
        )
        # Дата из metadata объекта
        return pd.to_datetime(obj['LastModified'])
    except:
        # Если файла нет, это первая загрузка
        return datetime(1970, 1, 1)

def incremental_load_timestamp_based(source_db, table_name, s3_path):
    """
    Инкрементальная загрузка на основе timestamp
    """
    # 1. Получить дату последней загрузки
    last_load = get_last_load_timestamp(s3_path)
    
    # 2. Запрос только новых/изменённых записей
    query = f"""
    SELECT * FROM {table_name}
    WHERE updated_at > '{last_load.isoformat()}'
    ORDER BY updated_at
    """
    
    df = pd.read_sql(query, source_db)
    print(f"Loaded {len(df)} rows updated after {last_load}")
    
    if len(df) > 0:
        # 3. Сохранить в S3 партициям
        df['load_date'] = datetime.now().date()
        
        s3_full_path = f"s3://{bucket}/{s3_path}/"
        wr.s3.to_parquet(
            df,
            path=s3_full_path,
            dataset=True,
            partition_cols=['load_date'],
            mode='append'  # Добавить к существующим данным
        )
        
        # 4. Обновить метаданные последней загрузки
        s3_client.put_object(
            Bucket=bucket,
            Key=f"{s3_path}/_last_load_timestamp",
            Body=str(datetime.now())
        )
        
        print(f"Incremental load completed. {len(df)} rows saved.")
    else:
        print("No new data to load")

Подход 2: Change Data Capture (CDC) с LSN/Sequence

Идея: использовать логи изменений БД (Log Sequence Numbers)

Этот подход эффективнее для систем, которые отслеживают изменения:

def incremental_load_with_lsn(source_db, table_name, s3_path):
    """
    Загрузка через Log Sequence Number (SQL Server / PostgreSQL)
    """
    # 1. Получить последний обработанный LSN
    try:
        with open('/tmp/last_lsn.txt', 'r') as f:
            last_lsn = int(f.read())
    except:
        last_lsn = 0
    
    # 2. Получить изменённые записи
    # Для SQL Server используем CDC (Change Data Capture)
    query = f"""
    SELECT 
        [cdc].[__$operation],  -- 1=delete, 2=insert, 3=before update, 4=after update
        [cdc].[__$seqval],
        * 
    FROM cdc.fn_cdc_get_all_changes_{table_name}(
        @from_lsn=sys.fn_cdc_increment_lsn({last_lsn}),
        @to_lsn=sys.fn_cdc_get_max_lsn(),
        'all'
    )
    """
    
    df = pd.read_sql(query, source_db)
    
    # 3. Обработать разные типы операций
    deletes = df[df['__$operation'] == 1]  # Удаления
    inserts = df[df['__$operation'] == 2]  # Вставки
    updates = df[df['__$operation'].isin([3, 4])]  # Обновления
    
    # 4. Сохранить в S3
    import awswrangler as wr
    
    if len(df) > 0:
        wr.s3.to_parquet(
            df[['__$operation', '__$seqval'] + [c for c in df.columns if not c.startswith('__')]],
            path=f"s3://{bucket}/{s3_path}/",
            dataset=True,
            partition_cols=['load_date'],
            mode='append'
        )
        
        # 5. Сохранить новый LSN
        new_lsn = df['__$seqval'].max()
        with open('/tmp/last_lsn.txt', 'w') as f:
            f.write(str(new_lsn))

Подход 3: Full Load vs Incremental Merge

SCD Type 2 (Slowly Changing Dimensions) — сохранение истории:

def scd_type2_merge(new_data, s3_path):
    """
    Слияние инкрементальных данных с сохранением истории
    SCD Type 2: добавляем valid_from/valid_to даты
    """
    import awswrangler as wr
    
    # 1. Читаем существующие данные
    try:
        existing = wr.s3.read_parquet(f"s3://{bucket}/{s3_path}/")
    except:
        existing = pd.DataFrame()
    
    # 2. Находим изменённые записи
    if len(existing) > 0:
        # Merge по key
        merged = pd.merge(
            existing,
            new_data,
            on=['id'],
            how='outer',
            indicator=True,
            suffixes=('_old', '_new')
        )
        
        # Найти обновленные записи
        changed = merged[
            (merged['_merge'] == 'both') & 
            (merged['name_old'] != merged['name_new'])  # Как пример
        ]
        
        if len(changed) > 0:
            # Закрыть старые версии
            existing.loc[existing['id'].isin(changed['id']), 'valid_to'] = datetime.now()
            
            # Добавить новые версии
            new_data['valid_from'] = datetime.now()
            new_data['valid_to'] = None
    else:
        # Первая загрузка
        new_data['valid_from'] = datetime.now()
        new_data['valid_to'] = None
    
    # 3. Сохранить обновленный датасет
    combined = pd.concat([existing, new_data], ignore_index=True)
    
    wr.s3.to_parquet(
        combined,
        path=f"s3://{bucket}/{s3_path}/",
        dataset=True,
        mode='overwrite'
    )

Подход 4: Дельта-озеро (Delta Lake)

Лучший современный подход — использование Delta Lake для ACID транзакций:

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeltaLoad").getOrCreate()

def incremental_load_delta(source_db, table_name, delta_path):
    """
    Инкрементальная загрузка с Delta Lake
    """
    # 1. Читаем новые данные
    new_df = spark.read.jdbc(source_db, table_name)
    
    # 2. Merge with existing Delta table
    delta_table = DeltaTable.forPath(spark, delta_path)
    
    delta_table.alias("existing").merge(
        new_df.alias("new"),
        "existing.id = new.id"
    ).whenMatchedUpdateAll(
        condition="existing.updated_at < new.updated_at"  # Обновить, если новые данные новее
    ).whenNotMatchedInsertAll().execute()
    
    # 3. Автоматически сохраняется версионирование
    # Можно откатиться к предыдущей версии если нужно

SQL подход для инкрементального обновления

-- Загрузка инкрементальных данных в промежуточную таблицу
CREATE TABLE staging_customers AS
SELECT 
    id,
    name,
    email,
    updated_at,
    CURRENT_TIMESTAMP as load_timestamp
FROM source_database.customers
WHERE updated_at > (SELECT MAX(updated_at) FROM customers);

-- UPSERT операция
MERGE INTO customers t
USING staging_customers s
ON t.id = s.id
WHEN MATCHED AND t.updated_at < s.updated_at THEN
    UPDATE SET 
        name = s.name,
        email = s.email,
        updated_at = s.updated_at
WHEN NOT MATCHED THEN
    INSERT (id, name, email, updated_at, load_timestamp)
    VALUES (s.id, s.name, s.email, s.updated_at, s.load_timestamp);

Best Practices для S3 инкрементальных загрузок

1. Используйте партиционирование

s3://bucket/table_name/
  ├── year=2024/month=01/
  ├── year=2024/month=02/
  └── year=2024/month=03/

2. Храните метаданные последней загрузки

metadata = {
    'last_sync_timestamp': datetime.now().isoformat(),
    'records_loaded': 1000,
    'load_duration_seconds': 45,
    'status': 'success'
}

s3_client.put_object(
    Bucket=bucket,
    Key='metadata.json',
    Body=json.dumps(metadata)
)

3. Реализуйте идемпотентность

# Один и тот же load дважды не должен создавать дубликаты
# Используйте уникальные ключи: (id, load_timestamp)

4. Мониторируйте качество

# Алерт, если инкрементальная загрузка не произошла 24 часа
if datetime.now() - last_load > timedelta(days=1):
    send_alert("No incremental load for 24 hours!")

Инкрементальные загрузки критичны для масштабируемых data pipelines, позволяя обрабатывать петабайты данных экономно и быстро.

Как работают инкрементальные загрузки в S3? | PrepBro