Как называется структура задач в Airflow?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Структура задач в Airflow
В Apache Airflow вся иерархия организации работ строится на DAG (Directed Acyclic Graph) — это основная концепция, которая определяет как задачи организуются, распределяются и выполняются.
DAG — Directed Acyclic Graph
DAG — это основная структурная единица в Airflow. Это ориентированный ациклический граф, который описывает:
- Набор задач (tasks)
- Зависимости между ними
- Порядок выполнения
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG(
dag_id="my_first_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False
) as dag:
pass
Основные компоненты DAG
1. DAG ID Уникальный идентификатор DAG'а в Airflow.
2. Task (Задача) Отдельная единица работы, обычно один оператор.
from airflow.operators.bash import BashOperator
task_1 = BashOperator(
task_id="extract_data",
bash_command="python /scripts/extract.py"
)
3. Operator (Оператор) Шаблон для создания задач. Airflow предоставляет:
- BashOperator: выполняет bash-команды
- PythonOperator: выполняет Python функции
- PostgresOperator: выполняет SQL-запросы
- SparkSubmitOperator: запускает Spark jobs
- HTTPOperator: делает HTTP запросы
- EmailOperator: отправляет письма
- BranchOperator: условное ветвление
4. Task Dependencies (Зависимости между задачами) Определяют порядок выполнения:
task_1 >> task_2 >> task_3
Типы структур в DAG
1. Линейная структура Задачи выполняются последовательно одна за другой.
2. Древовидная структура (Fan-out) Одна задача порождает множество параллельных подзадач.
task_1 >> [task_2, task_3, task_4]
3. Слияние (Fan-in) Множество задач сходятся в одну.
[task_1, task_2, task_3] >> task_4
4. Сложная сетевая структура Сочетание fan-out и fan-in паттернов для создания сложных зависимостей.
TaskGroup — группировка задач
Для организации больших DAG'ов используются TaskGroup — это логическая группировка задач, которая улучшает читаемость и структурированность DAG'а:
from airflow.utils.task_group import TaskGroup
with DAG(dag_id="complex_pipeline") as dag:
with TaskGroup("extract_group") as extract_group:
extract_users = BashOperator(
task_id="extract_users",
bash_command="python extract_users.py"
)
extract_orders = BashOperator(
task_id="extract_orders",
bash_command="python extract_orders.py"
)
XCom — обмен данными между задачами
Для передачи данных между задачами используется XCom (cross-communication). Это механизм, позволяющий одной задаче отправить результат, который другая задача может получить:
def extract_data():
return {"count": 1000, "file": "/tmp/data.csv"}
def process_data(ti):
extracted = ti.xcom_pull(task_ids="extract")
count = extracted["count"]
Важные параметры DAG
schedule_interval — расписание выполнения:
- @daily — каждый день
- @hourly — каждый час
- 0 2 * * * — cron выражение (в 02:00 каждый день)
- None — только ручной запуск
max_active_runs — максимум одновременных запусков DAG'а:
- 1 — последовательные запуски
- N — параллельные запуски
default_view — вид по умолчанию в UI:
- graph — граф зависимостей (рекомендуется)
- tree — дерево
- calendar — календарь
Жизненный цикл DAG
1. Parsing — Airflow парсит DAG файлы 2. Scheduling — Scheduler определяет какие DAG Run нужно создать 3. Queueing — Tasks попадают в очередь 4. Execution — Executor запускает Tasks 5. Monitoring — UI показывает состояние
Выводы
DAG — это основная структурная единица в Airflow. Она состоит из Tasks (операции), которые связаны Dependencies (зависимостями). Для организации больших DAG'ов используются TaskGroup и XCom для обмена данными. Понимание иерархии DAG → Tasks → Operators → Dependencies критично для построения надёжных и масштабируемых data pipelines.