← Назад к вопросам
Что такое logical date в Airflow?
2.3 Middle🔥 171 комментариев
#Apache Airflow и оркестрация
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое Logical Date в Airflow
Logical Date (логическая дата) в Apache Airflow (версия 2.0+) — это дата, за которую выполняется DAG. Это НЕ дата запуска DAG, а дата, для которой нужно обработать данные. Ранее это называлось execution_date.
Различие между Logical Date и Execution Date
- Logical Date (execution_date) — дата, за которую обрабатываются данные
- Run Date — когда фактически запустился DAG
Пример: DAG запланирован на каждый день в 00:00. Если логическая дата 2024-01-01, DAG запустится 2024-01-02 в 00:00, обрабатывая данные за 2024-01-01.
Как получить Logical Date в коде
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data(logical_date, **context):
print(f"Processing data for: {logical_date}")
year = logical_date.year
month = logical_date.month
day = logical_date.day
# Используй переменные для фильтрации данных
query = f"""
SELECT * FROM events
WHERE event_date = '{logical_date.date()}'
"""
print(query)
with DAG(
'data_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
task = PythonOperator(
task_id='process',
python_callable=process_data
)
Доступ к Logical Date в разных контекстах
1. В PythonOperator через context
def my_task(logical_date=None, **context):
# logical_date уже передана как параметр
print(f"Logical date: {logical_date}")
# Или из context
execution_date = context['logical_date']
print(f"Execution date: {execution_date}")
task = PythonOperator(
task_id='my_task',
python_callable=my_task,
op_kwargs={}
)
2. В SQL операторах (Airflow 2.0+)
from airflow.providers.postgres.operators.postgres import PostgresOperator
load_task = PostgresOperator(
task_id='load_data',
sql="""
INSERT INTO daily_stats
SELECT date '{{ logical_date }}' as report_date,
COUNT(*) as total_events
FROM raw_events
WHERE event_date = '{{ logical_date.date() }}'
""",
postgres_conn_id='postgres_default'
)
3. Через Jinja2 шаблоны
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id='print_date',
bash_command="echo 'Processing data for {{ logical_date }}'"
)
# Также доступны:
# {{ ds }} — logical_date в формате YYYY-MM-DD
# {{ ds_nodash }} — logical_date в формате YYYYMMDD
# {{ execution_date }} — то же самое, что logical_date
# {{ prev_ds }} — предыдущая логическая дата
Практический пример: Data Backfill
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def fetch_and_process(**context):
logical_date = context['logical_date']
date_str = logical_date.strftime('%Y-%m-%d')
# Используй логическую дату для фильтрации данных
print(f"Fetching data for {date_str}")
# Запрос в API или БД
# result = fetch_data(date=date_str)
return f"Processed data for {date_str}"
with DAG(
'daily_pipeline',
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 1, 31), # backfill до этой даты
schedule_interval='@daily',
catchup=True # выполнить все пропущенные дни
) as dag:
process = PythonOperator(
task_id='process_data',
python_callable=fetch_and_process,
provide_context=True
)
Ключевые рекомендации
- Используй logical_date для фильтрации данных — это гарантирует корректность backfill
- Не используй datetime.now() — используй logical_date из context
- Jinja2 шаблоны удобнее — {{ logical_date }} автоматически интерполируется
- ds и ds_nodash — самые частые форматы для логической даты
- Catchup режим — позволяет переобработать исторические данные