Комментарии (2)
Ответ сгенерирован нейросетью и может содержать ошибки
Оператор (Operator) в Apache Airflow
Оператор в Airflow — это класс, который определяет отдельную задачу (task) в DAG (Directed Acyclic Graph). Это основной строительный блок рабочего процесса. Каждый оператор — это абстракция над единицей работы, которую нужно выполнить.
Концепция
Если DAG — это граф, определяющий структуру рабочего процесса, то операторы — это узлы этого графа, которые выполняют конкретную работу. При запуске DAG каждый оператор создаёт экземпляр задачи (task instance), который выполняется как независимая единица.
Основные встроенные операторы
BashOperator — выполняет bash команды:
from airflow.operators.bash import BashOperator
task1 = BashOperator(
task_id='bash_task',
bash_command='echo "Hello, World!" && python script.py'
)
PythonOperator — выполняет Python функции:
from airflow.operators.python import PythonOperator
def my_function():
print("This is a Python task")
return "Done"
task2 = PythonOperator(
task_id='python_task',
python_callable=my_function
)
Обмен данными (XCom)
Операторы могут обмениваться данными через XCom (cross-communication):
def push_data():
return {'message': 'Hello', 'count': 42}
def pull_data(**context):
ti = context['task_instance']
data = ti.xcom_pull(task_ids='task1')
print(f"Received: {data}")
Правила триггера (Trigger Rules)
Определяют, когда задача должна выполниться:
- all_success (по умолчанию) — все предыдущие успешны
- all_failed — все предыдущие не удались
- all_done — все предыдущие завершены
- one_success — как минимум одна успешна
- one_failed — как минимум одна не удалась
- none_failed — ни одна не потерпела неудачу
Структура оператора
Каждый оператор имеет параметры:
BashOperator(
task_id='unique_task_id',
bash_command='echo hello',
retries=3,
timeout=300,
queue='default',
owner='airflow'
)
DAG с несколькими операторами
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('ml_pipeline', start_date=datetime(2024, 1, 1)) as dag:
task1 = PythonOperator(
task_id='extract',
python_callable=extract_data
)
task2 = PythonOperator(
task_id='process',
python_callable=process_data
)
task1 >> task2
Практический смысл
Операторы позволяют:
- Разбивать сложные процессы на простые задачи
- Повторно использовать общую функциональность
- Управлять зависимостями между задачами
- Обрабатывать ошибки и повторы
- Запускать задачи в правильном порядке
- Мониторить выполнение каждой задачи
Оператор — главный инструмент для построения надёжных data pipeline в Airflow.