← Назад к вопросам
Как с помощью airflow пересчитать витрины?
2.0 Middle🔥 231 комментариев
#Apache Airflow и оркестрация
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Пересчёт витрин в Airflow
Что такое витрины и зачем их пересчитывать
Витрины — это агрегированные представления данных, оптимизированные для быстрого доступа и аналитики. Они содержат суммированные, сгруппированные или трансформированные данные из фактических таблиц.
Причины пересчёта витрин:
- Обновление исходных данных
- Обнаружение ошибок в логике калькуляции
- Изменение бизнес-метрик
- Восстановление данных после сбоя
- Переход на новую версию ETL логики
Стратегии пересчёта витрин
1. Полный пересчёт (Full Recalculation)
Когда использовать: для небольших витрин или критичных метрик
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"start_date": datetime(2024, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"full_recalculate_views",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
) as dag:
# Удаляем старую витрину
truncate_view = PostgresOperator(
task_id="truncate_old_view",
postgres_conn_id="data_warehouse",
sql="TRUNCATE TABLE analytics.monthly_sales_view;",
)
# Пересчитываем витрину с нуля
recalculate_view = PostgresOperator(
task_id="recalculate_view",
postgres_conn_id="data_warehouse",
sql="""
INSERT INTO analytics.monthly_sales_view (
sales_month,
region,
total_revenue,
total_orders,
avg_order_value
)
SELECT
DATE_TRUNC(month, order_date)::DATE AS sales_month,
region,
SUM(amount) AS total_revenue,
COUNT(*) AS total_orders,
AVG(amount) AS avg_order_value
FROM raw_data.orders
WHERE order_date >= 2020-01-01
GROUP BY sales_month, region
ORDER BY sales_month DESC;
""",
)
truncate_view >> recalculate_view
2. Инкрементальный пересчёт (Incremental Recalculation)
Когда использовать: для больших витрин, когда нужны производительность
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
def incremental_recalculate(**context):
"""Пересчитывает только новые/изменённые данные"""
# Получаем дату последнего пересчёта из логов
conn = psycopg2.connect("dbname=warehouse user=postgres")
cur = conn.cursor()
# Получаем последний processed_date
cur.execute("""
SELECT MAX(processed_date)
FROM metadata.view_recalc_log
WHERE view_name = daily_revenue_view
""")
last_processed = cur.fetchone()[0]
if last_processed is None:
last_processed = datetime(2020, 1, 1).date()
# Пересчитываем только новые данные
cur.execute("""
INSERT INTO analytics.daily_revenue_view (
revenue_date,
product_category,
revenue,
processed_at
)
SELECT
DATE(order_date) AS revenue_date,
product_category,
SUM(amount) AS revenue,
CURRENT_TIMESTAMP
FROM raw_data.orders
WHERE DATE(order_date) > %s
GROUP BY revenue_date, product_category
ON CONFLICT (revenue_date, product_category)
DO UPDATE SET
revenue = EXCLUDED.revenue,
processed_at = EXCLUDED.processed_at;
""", (last_processed,))
# Логируем факт пересчёта
cur.execute("""
INSERT INTO metadata.view_recalc_log (view_name, processed_date, status)
VALUES (daily_revenue_view, CURRENT_DATE, success);
""")
conn.commit()
cur.close()
conn.close()
incremental_task = PythonOperator(
task_id="incremental_recalc",
python_callable=incremental_recalculate,
dag=dag,
)
3. Пересчёт по периодам (Window Recalculation)
Когда использовать: когда нужно пересчитать определённый диапазон дат
from airflow.models import Variable
from airflow.operators.python import PythonOperator
def recalculate_period(**context):
"""Пересчитывает витрину за конкретный период"""
# Получаем параметры из переменных/конфига
start_date = Variable.get("recalc_start_date")
end_date = Variable.get("recalc_end_date")
conn = psycopg2.connect("dbname=warehouse user=postgres")
cur = conn.cursor()
# Удаляем старые данные за период
cur.execute("""
DELETE FROM analytics.weekly_forecast_view
WHERE forecast_week >= %s AND forecast_week <= %s;
""", (start_date, end_date))
# Пересчитываем за период
cur.execute("""
INSERT INTO analytics.weekly_forecast_view (
forecast_week,
product_id,
forecast_sales,
confidence_level
)
SELECT
DATE_TRUNC(week, order_date)::DATE,
product_id,
ROUND(AVG(amount) * 1.05, 2), # +5% для прогноза
0.85
FROM raw_data.orders
WHERE DATE(order_date) BETWEEN %s AND %s
GROUP BY DATE_TRUNC(week, order_date), product_id;
""", (start_date, end_date))
conn.commit()
cur.close()
conn.close()
recalc_period_task = PythonOperator(
task_id="recalc_period",
python_callable=recalculate_period,
dag=dag,
)
Пример: полный DAG для пересчёта витрин
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
import logging
default_args = {
"owner": "analytics-team",
"start_date": datetime(2024, 1, 1),
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
def check_need_full_recalc(**context):
"""Проверяет, нужен ли полный пересчёт"""
import psycopg2
conn = psycopg2.connect("dbname=warehouse user=postgres")
cur = conn.cursor()
# Проверяем флаг полного пересчёта
cur.execute("""
SELECT need_full_recalc
FROM metadata.view_settings
WHERE view_name = sales_view
""")
need_full = cur.fetchone()[0]
cur.close()
conn.close()
if need_full:
return "full_recalc_branch"
else:
return "incremental_recalc_branch"
with DAG(
"smart_view_recalculation",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
) as dag:
# Начальная проверка
start = DummyOperator(task_id="start")
# Ветвление: проверяем тип пересчёта
check_recalc_type = BranchPythonOperator(
task_id="check_recalc_type",
python_callable=check_need_full_recalc,
)
# Ветка 1: Полный пересчёт
full_recalc_branch = DummyOperator(task_id="full_recalc_branch")
truncate_task = PostgresOperator(
task_id="truncate_view",
postgres_conn_id="warehouse",
sql="TRUNCATE TABLE analytics.sales_view;",
)
full_recalc_task = PostgresOperator(
task_id="full_recalc",
postgres_conn_id="warehouse",
sql="""
INSERT INTO analytics.sales_view
SELECT date, region, SUM(amount)
FROM raw_data.transactions
GROUP BY date, region;
""",
)
# Ветка 2: Инкрементальный пересчёт
incremental_recalc_branch = DummyOperator(task_id="incremental_recalc_branch")
incremental_task = PythonOperator(
task_id="incremental_recalc_task",
python_callable=incremental_recalculate,
)
# Финальная проверка
validate_task = PythonOperator(
task_id="validate_view",
python_callable=lambda: logging.info("Витрины пересчитаны успешно"),
)
end = DummyOperator(task_id="end", trigger_rule="none_failed")
# DAG структура
start >> check_recalc_type
check_recalc_type >> full_recalc_branch >> truncate_task >> full_recalc_task >> validate_task
check_recalc_type >> incremental_recalc_branch >> incremental_task >> validate_task
validate_task >> end
Мониторинг и логирование пересчёта
def log_recalculation_stats(**context):
"""Логирует статистику пересчёта"""
conn = psycopg2.connect("dbname=warehouse user=postgres")
cur = conn.cursor()
# Получаем статистику
cur.execute("""
SELECT
view_name,
row_count,
size_mb,
last_recalc_duration,
status
FROM metadata.view_stats
WHERE last_recalc_date = CURRENT_DATE
""")
stats = cur.fetchall()
for stat in stats:
logging.info(f"View: {stat[0]}, Rows: {stat[1]}, Size: {stat[2]}MB, Duration: {stat[3]}s, Status: {stat[4]}")
cur.close()
conn.close()
stats_task = PythonOperator(
task_id="log_stats",
python_callable=log_recalculation_stats,
)
Оптимизация пересчёта
-- Используй MATERIALIZED VIEW для кэширования
CREATE MATERIALIZED VIEW analytics.daily_sales_mv AS
SELECT
DATE(order_date) as sale_date,
category,
SUM(amount) as total_sales
FROM raw_data.orders
GROUP BY sale_date, category;
-- Обновляй только необходимые данные
REFRESH MATERIALIZED VIEW CONCURRENTLY analytics.daily_sales_mv;
Выводы
Пересчёт витрин в Airflow:
- Полный пересчёт — TRUNCATE + INSERT для критичных метрик
- Инкрементальный — INSERT/UPDATE только новых данных (быстрее)
- Периодический — пересчёт конкретного диапазона дат
- Мониторинг — логирование статистики для отслеживания качества
- MATERIALIZED VIEW — для кэширования часто используемых витрин
Выбор стратегии зависит от размера витрины, частоты обновлений и требований к консистентности данных.