← Назад к вопросам

Как с помощью airflow пересчитать витрины?

2.0 Middle🔥 231 комментариев
#Apache Airflow и оркестрация

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Пересчёт витрин в Airflow

Что такое витрины и зачем их пересчитывать

Витрины — это агрегированные представления данных, оптимизированные для быстрого доступа и аналитики. Они содержат суммированные, сгруппированные или трансформированные данные из фактических таблиц.

Причины пересчёта витрин:

  • Обновление исходных данных
  • Обнаружение ошибок в логике калькуляции
  • Изменение бизнес-метрик
  • Восстановление данных после сбоя
  • Переход на новую версию ETL логики

Стратегии пересчёта витрин

1. Полный пересчёт (Full Recalculation)

Когда использовать: для небольших витрин или критичных метрик

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "start_date": datetime(2024, 1, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "full_recalculate_views",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
) as dag:
    
    # Удаляем старую витрину
    truncate_view = PostgresOperator(
        task_id="truncate_old_view",
        postgres_conn_id="data_warehouse",
        sql="TRUNCATE TABLE analytics.monthly_sales_view;",
    )
    
    # Пересчитываем витрину с нуля
    recalculate_view = PostgresOperator(
        task_id="recalculate_view",
        postgres_conn_id="data_warehouse",
        sql="""
            INSERT INTO analytics.monthly_sales_view (
                sales_month,
                region,
                total_revenue,
                total_orders,
                avg_order_value
            )
            SELECT
                DATE_TRUNC(month, order_date)::DATE AS sales_month,
                region,
                SUM(amount) AS total_revenue,
                COUNT(*) AS total_orders,
                AVG(amount) AS avg_order_value
            FROM raw_data.orders
            WHERE order_date >= 2020-01-01
            GROUP BY sales_month, region
            ORDER BY sales_month DESC;
        """,
    )
    
    truncate_view >> recalculate_view

2. Инкрементальный пересчёт (Incremental Recalculation)

Когда использовать: для больших витрин, когда нужны производительность

from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2

def incremental_recalculate(**context):
    """Пересчитывает только новые/изменённые данные"""
    
    # Получаем дату последнего пересчёта из логов
    conn = psycopg2.connect("dbname=warehouse user=postgres")
    cur = conn.cursor()
    
    # Получаем последний processed_date
    cur.execute("""
        SELECT MAX(processed_date)
        FROM metadata.view_recalc_log
        WHERE view_name = daily_revenue_view
    """)
    
    last_processed = cur.fetchone()[0]
    if last_processed is None:
        last_processed = datetime(2020, 1, 1).date()
    
    # Пересчитываем только новые данные
    cur.execute("""
        INSERT INTO analytics.daily_revenue_view (
            revenue_date,
            product_category,
            revenue,
            processed_at
        )
        SELECT
            DATE(order_date) AS revenue_date,
            product_category,
            SUM(amount) AS revenue,
            CURRENT_TIMESTAMP
        FROM raw_data.orders
        WHERE DATE(order_date) > %s
        GROUP BY revenue_date, product_category
        ON CONFLICT (revenue_date, product_category)
        DO UPDATE SET
            revenue = EXCLUDED.revenue,
            processed_at = EXCLUDED.processed_at;
    """, (last_processed,))
    
    # Логируем факт пересчёта
    cur.execute("""
        INSERT INTO metadata.view_recalc_log (view_name, processed_date, status)
        VALUES (daily_revenue_view, CURRENT_DATE, success);
    """)
    
    conn.commit()
    cur.close()
    conn.close()

incremental_task = PythonOperator(
    task_id="incremental_recalc",
    python_callable=incremental_recalculate,
    dag=dag,
)

3. Пересчёт по периодам (Window Recalculation)

Когда использовать: когда нужно пересчитать определённый диапазон дат

from airflow.models import Variable
from airflow.operators.python import PythonOperator

def recalculate_period(**context):
    """Пересчитывает витрину за конкретный период"""
    
    # Получаем параметры из переменных/конфига
    start_date = Variable.get("recalc_start_date")
    end_date = Variable.get("recalc_end_date")
    
    conn = psycopg2.connect("dbname=warehouse user=postgres")
    cur = conn.cursor()
    
    # Удаляем старые данные за период
    cur.execute("""
        DELETE FROM analytics.weekly_forecast_view
        WHERE forecast_week >= %s AND forecast_week <= %s;
    """, (start_date, end_date))
    
    # Пересчитываем за период
    cur.execute("""
        INSERT INTO analytics.weekly_forecast_view (
            forecast_week,
            product_id,
            forecast_sales,
            confidence_level
        )
        SELECT
            DATE_TRUNC(week, order_date)::DATE,
            product_id,
            ROUND(AVG(amount) * 1.05, 2),  # +5% для прогноза
            0.85
        FROM raw_data.orders
        WHERE DATE(order_date) BETWEEN %s AND %s
        GROUP BY DATE_TRUNC(week, order_date), product_id;
    """, (start_date, end_date))
    
    conn.commit()
    cur.close()
    conn.close()

recalc_period_task = PythonOperator(
    task_id="recalc_period",
    python_callable=recalculate_period,
    dag=dag,
)

Пример: полный DAG для пересчёта витрин

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
import logging

default_args = {
    "owner": "analytics-team",
    "start_date": datetime(2024, 1, 1),
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

def check_need_full_recalc(**context):
    """Проверяет, нужен ли полный пересчёт"""
    import psycopg2
    
    conn = psycopg2.connect("dbname=warehouse user=postgres")
    cur = conn.cursor()
    
    # Проверяем флаг полного пересчёта
    cur.execute("""
        SELECT need_full_recalc
        FROM metadata.view_settings
        WHERE view_name = sales_view
    """)
    
    need_full = cur.fetchone()[0]
    cur.close()
    conn.close()
    
    if need_full:
        return "full_recalc_branch"
    else:
        return "incremental_recalc_branch"

with DAG(
    "smart_view_recalculation",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
) as dag:
    
    # Начальная проверка
    start = DummyOperator(task_id="start")
    
    # Ветвление: проверяем тип пересчёта
    check_recalc_type = BranchPythonOperator(
        task_id="check_recalc_type",
        python_callable=check_need_full_recalc,
    )
    
    # Ветка 1: Полный пересчёт
    full_recalc_branch = DummyOperator(task_id="full_recalc_branch")
    
    truncate_task = PostgresOperator(
        task_id="truncate_view",
        postgres_conn_id="warehouse",
        sql="TRUNCATE TABLE analytics.sales_view;",
    )
    
    full_recalc_task = PostgresOperator(
        task_id="full_recalc",
        postgres_conn_id="warehouse",
        sql="""
            INSERT INTO analytics.sales_view
            SELECT date, region, SUM(amount)
            FROM raw_data.transactions
            GROUP BY date, region;
        """,
    )
    
    # Ветка 2: Инкрементальный пересчёт
    incremental_recalc_branch = DummyOperator(task_id="incremental_recalc_branch")
    
    incremental_task = PythonOperator(
        task_id="incremental_recalc_task",
        python_callable=incremental_recalculate,
    )
    
    # Финальная проверка
    validate_task = PythonOperator(
        task_id="validate_view",
        python_callable=lambda: logging.info("Витрины пересчитаны успешно"),
    )
    
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    # DAG структура
    start >> check_recalc_type
    
    check_recalc_type >> full_recalc_branch >> truncate_task >> full_recalc_task >> validate_task
    check_recalc_type >> incremental_recalc_branch >> incremental_task >> validate_task
    
    validate_task >> end

Мониторинг и логирование пересчёта

def log_recalculation_stats(**context):
    """Логирует статистику пересчёта"""
    
    conn = psycopg2.connect("dbname=warehouse user=postgres")
    cur = conn.cursor()
    
    # Получаем статистику
    cur.execute("""
        SELECT
            view_name,
            row_count,
            size_mb,
            last_recalc_duration,
            status
        FROM metadata.view_stats
        WHERE last_recalc_date = CURRENT_DATE
    """)
    
    stats = cur.fetchall()
    
    for stat in stats:
        logging.info(f"View: {stat[0]}, Rows: {stat[1]}, Size: {stat[2]}MB, Duration: {stat[3]}s, Status: {stat[4]}")
    
    cur.close()
    conn.close()

stats_task = PythonOperator(
    task_id="log_stats",
    python_callable=log_recalculation_stats,
)

Оптимизация пересчёта

-- Используй MATERIALIZED VIEW для кэширования
CREATE MATERIALIZED VIEW analytics.daily_sales_mv AS
SELECT
    DATE(order_date) as sale_date,
    category,
    SUM(amount) as total_sales
FROM raw_data.orders
GROUP BY sale_date, category;

-- Обновляй только необходимые данные
REFRESH MATERIALIZED VIEW CONCURRENTLY analytics.daily_sales_mv;

Выводы

Пересчёт витрин в Airflow:

  1. Полный пересчёт — TRUNCATE + INSERT для критичных метрик
  2. Инкрементальный — INSERT/UPDATE только новых данных (быстрее)
  3. Периодический — пересчёт конкретного диапазона дат
  4. Мониторинг — логирование статистики для отслеживания качества
  5. MATERIALIZED VIEW — для кэширования часто используемых витрин

Выбор стратегии зависит от размера витрины, частоты обновлений и требований к консистентности данных.