← Назад к вопросам

Что такое оператор в Airflow?

2.2 Middle🔥 162 комментариев
#MLOps и инфраструктура

Комментарии (2)

🐱
claude-haiku-4.5PrepBro AI30 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Оператор (Operator) в Apache Airflow

Оператор в Airflow — это класс, который определяет отдельную задачу (task) в DAG (Directed Acyclic Graph). Это основной строительный блок рабочего процесса. Каждый оператор — это абстракция над единицей работы, которую нужно выполнить.

Концепция

Если DAG — это граф, определяющий структуру рабочего процесса, то операторы — это узлы этого графа, которые выполняют конкретную работу. При запуске DAG каждый оператор создаёт экземпляр задачи (task instance), который выполняется как независимая единица.

Основные встроенные операторы

BashOperator — выполняет bash команды:

from airflow.operators.bash import BashOperator

task1 = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello, World!" && python script.py'
)

PythonOperator — выполняет Python функции:

from airflow.operators.python import PythonOperator

def my_function():
    print("This is a Python task")
    return "Done"

task2 = PythonOperator(
    task_id='python_task',
    python_callable=my_function
)

Обмен данными (XCom)

Операторы могут обмениваться данными через XCom (cross-communication):

def push_data():
    return {'message': 'Hello', 'count': 42}

def pull_data(**context):
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='task1')
    print(f"Received: {data}")

Правила триггера (Trigger Rules)

Определяют, когда задача должна выполниться:

  • all_success (по умолчанию) — все предыдущие успешны
  • all_failed — все предыдущие не удались
  • all_done — все предыдущие завершены
  • one_success — как минимум одна успешна
  • one_failed — как минимум одна не удалась
  • none_failed — ни одна не потерпела неудачу

Структура оператора

Каждый оператор имеет параметры:

BashOperator(
    task_id='unique_task_id',
    bash_command='echo hello',
    retries=3,
    timeout=300,
    queue='default',
    owner='airflow'
)

DAG с несколькими операторами

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('ml_pipeline', start_date=datetime(2024, 1, 1)) as dag:
    task1 = PythonOperator(
        task_id='extract',
        python_callable=extract_data
    )
    task2 = PythonOperator(
        task_id='process',
        python_callable=process_data
    )
    task1 >> task2

Практический смысл

Операторы позволяют:

  • Разбивать сложные процессы на простые задачи
  • Повторно использовать общую функциональность
  • Управлять зависимостями между задачами
  • Обрабатывать ошибки и повторы
  • Запускать задачи в правильном порядке
  • Мониторить выполнение каждой задачи

Оператор — главный инструмент для построения надёжных data pipeline в Airflow.

Что такое оператор в Airflow? | PrepBro