← Назад к вопросам

Какие операторы Airflow использовал?

1.7 Middle🔥 161 комментариев
#Apache Airflow и оркестрация#Опыт и soft skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Операторы Apache Airflow

Что такое операторы в Airflow

Оператор (Operator) — это класс, который определяет единую задачу (task) в DAG. Каждый оператор выполняет конкретное действие: выполнение кода, запуск скриптов, запросы к базам данных, отправка сообщений и т.д.

Оператор инкапсулирует логику и обеспечивает интеграцию с различными системами.

Основные встроенные операторы

1. PythonOperator

Назначение: Выполнение Python функции

from airflow.operators.python import PythonOperator
from datetime import datetime

def load_data(**context):
    # Доступ к контексту выполнения
    execution_date = context['execution_date']
    task_instance = context['task_instance']
    
    print(f'Loading data for {execution_date}')
    
    # Возврат данных для следующих операторов
    return {'loaded_rows': 1000, 'status': 'success'}

task_load = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    provide_context=True,
    dag=dag
)

2. BashOperator

Назначение: Выполнение bash команд

from airflow.operators.bash import BashOperator

task_bash = BashOperator(
    task_id='run_spark_job',
    bash_command='''
    spark-submit \
        --class com.example.DataProcessor \
        --master spark://spark-master:7077 \
        /apps/data-processor.jar \
        --input s3://bucket/input/ \
        --output s3://bucket/output/ \
        --date {{ ds }}
    ''',
    dag=dag
)

3. PythonVirtualenvOperator

Назначение: Выполнение Python функции в изолированной виртуальной среде

from airflow.operators.python import PythonVirtualenvOperator

def transform_data():
    import pandas as pd
    import polars as pl
    
    df = pl.read_csv('/tmp/data.csv')
    result = df.filter(pl.col('amount') > 100)
    return result.to_dicts()

task_venv = PythonVirtualenvOperator(
    task_id='transform_with_venv',
    python_callable=transform_data,
    requirements=['pandas==1.5.0', 'polars==0.18.0'],
    dag=dag
)

4. SqlOperator (и его варианты)

Назначение: Выполнение SQL запросов

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

# PostgreSQL
task_postgres = PostgresOperator(
    task_id='create_customer_mart',
    sql='''
    CREATE TABLE IF NOT EXISTS marts.customer_summary AS
    SELECT 
        customer_id,
        COUNT(*) as total_orders,
        SUM(amount) as total_spent
    FROM staging.orders
    GROUP BY customer_id;
    ''',
    postgres_conn_id='postgres_warehouse',
    dag=dag
)

# Snowflake
task_snowflake = SnowflakeOperator(
    task_id='load_to_snowflake',
    sql='INSERT INTO warehouse.events SELECT * FROM staging.raw_events;',
    snowflake_conn_id='snowflake_prod',
    dag=dag
)

5. SparkSubmitOperator

Назначение: Отправка Spark jobs на кластер

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

task_spark = SparkSubmitOperator(
    task_id='process_large_dataset',
    application='/opt/spark-jobs/etl.py',
    conf={
        'spark.executor.memory': '8g',
        'spark.executor.cores': '4',
        'spark.driver.memory': '4g'
    },
    application_args=[
        '--input', 's3://bucket/input/',
        '--output', 's3://bucket/output/',
        '--mode', 'batch'
    ],
    spark_binary='/opt/spark/bin/spark-submit',
    dag=dag
)

6. BranchOperator

Назначение: Условное ветвление (выполнение одного из нескольких путей)

from airflow.operators.python import PythonOperator, BranchPythonOperator

def decide_branch(**context):
    # Логика принятия решения
    rows = context['task_instance'].xcom_pull(task_ids='check_data_quality')
    
    if rows > 1000000:
        return 'process_large_dataset'
    else:
        return 'process_small_dataset'

task_branch = BranchPythonOperator(
    task_id='decide_processing',
    python_callable=decide_branch,
    dag=dag
)

task_large = PythonOperator(
    task_id='process_large_dataset',
    python_callable=lambda: print('Processing large dataset'),
    dag=dag
)

task_small = PythonOperator(
    task_id='process_small_dataset',
    python_callable=lambda: print('Processing small dataset'),
    dag=dag
)

task_branch >> [task_large, task_small]

7. HttpOperator

Назначение: Отправка HTTP запросов

from airflow.providers.http.operators.http import SimpleHttpOperator
import json

task_http = SimpleHttpOperator(
    task_id='call_api',
    method='POST',
    http_conn_id='api_endpoint',
    endpoint='/v1/data/process',
    data=json.dumps({
        'date': '{{ ds }}',
        'mode': 'incremental'
    }),
    headers={'Content-Type': 'application/json'},
    dag=dag
)

8. S3Operator (и другие облачные операторы)

Назначение: Работа с облачным хранилищем

from airflow.providers.amazon.aws.operators.s3 import (
    S3CreateBucketOperator,
    S3DeleteObjectsOperator,
    S3ListOperator
)

task_list_s3 = S3ListOperator(
    task_id='list_files_in_s3',
    bucket='my-data-bucket',
    prefix='raw/2026-03-26/',
    aws_conn_id='aws_default',
    dag=dag
)

task_delete_old = S3DeleteObjectsOperator(
    task_id='cleanup_old_data',
    bucket='my-data-bucket',
    keys=['archive/2023/**'],
    aws_conn_id='aws_default',
    dag=dag
)

9. Sensor операторы

Назначение: Ожидание условия или события

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.s3_key_size import S3KeySizeSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_delta import TimeDeltaSensor

# Ожидание файла на диске
task_file_sensor = FileSensor(
    task_id='wait_for_input_file',
    filepath='/data/input/transactions.csv',
    poke_interval=60,  # Проверять каждые 60 сек
    timeout=3600,      # Timeout 1 час
    dag=dag
)

# Ожидание файла в S3 достаточного размера
task_s3_sensor = S3KeySizeSensor(
    task_id='wait_for_s3_file',
    bucket_name='my-bucket',
    bucket_key='data/input/data.parquet',
    check_fn=lambda size: size > 1024 * 1024 * 100,  # Ждём 100+ MB
    aws_conn_id='aws_default',
    dag=dag
)

# Ожидание завершения другого DAG
task_external_dag = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_pipeline',
    external_task_id='final_task',
    execution_delta=timedelta(hours=-1),
    dag=dag
)

# Задержка перед следующей задачей
task_delay = TimeDeltaSensor(
    task_id='wait_before_processing',
    delta=timedelta(minutes=30),
    dag=dag
)

10. EmailOperator

Назначение: Отправка email уведомлений

from airflow.operators.email_operator import EmailOperator

task_email = EmailOperator(
    task_id='send_completion_email',
    to='data-team@company.com',
    subject='Pipeline {{ ds }} completed',
    html_content='''
    <h2>Data Pipeline Report</h2>
    <p>Date: {{ ds }}</p>
    <p>Status: {{ task_instance.state }}</p>
    <p><a href="{{ task_instance.log_url }}">View Logs</a></p>
    ''',
    dag=dag
)

11. TriggerDagRunOperator

Назначение: Запуск другого DAG из текущего

from airflow.operators.dagrun_operator import TriggerDagRunOperator

task_trigger = TriggerDagRunOperator(
    task_id='trigger_downstream_pipeline',
    trigger_dag_id='customer_analytics_pipeline',
    execution_date='{{ execution_date }}',
    dag=dag
)

12. DummyOperator

Назначение: Placeholder операция (для структурирования DAG)

from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

start >> [task1, task2] >> end

Комплексный пример DAG со множеством операторов

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.s3_key_size import S3KeySizeSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Ежедневно в 2:00 UTC
    start_date=datetime(2023, 1, 1),
) as dag:
    
    # 1. Ожидание входных данных
    wait_input = FileSensor(
        task_id='wait_input_file',
        filepath='/data/input/transactions.csv',
        poke_interval=60,
        timeout=3600
    )
    
    # 2. Извлечение и подготовка
    def extract_data(**context):
        import pandas as pd
        df = pd.read_csv('/data/input/transactions.csv')
        print(f'Extracted {len(df)} rows')
        return {'rows': len(df)}
    
    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data
    )
    
    # 3. Трансформация в Spark
    transform = BashOperator(
        task_id='transform_spark',
        bash_command='spark-submit /opt/jobs/transform.py --date {{ ds }}'
    )
    
    # 4. Загрузка в базу данных
    load_db = PostgresOperator(
        task_id='load_to_db',
        sql='''
        INSERT INTO marts.transactions_summary
        SELECT date, count(*) as total_count, sum(amount) as total_amount
        FROM staging.transactions
        WHERE date = '{{ ds }}'
        GROUP BY date;
        ''',
        postgres_conn_id='postgres_warehouse'
    )
    
    # 5. Валидация данных
    def validate_data(**context):
        # Проверка качества
        print('Validating data quality...')
        return True
    
    validate = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data
    )
    
    # 6. Конец
    finish = DummyOperator(task_id='finish')
    
    # Определение зависимостей
    wait_input >> extract >> transform >> load_db >> validate >> finish

Best Practices при использовании операторов

# 1. Используйте XCom для передачи данных между операторами
def task1(**context):
    result = {'key': 'value'}
    context['task_instance'].xcom_push(key='result', value=result)

def task2(**context):
    result = context['task_instance'].xcom_pull(
        task_ids='task1',
        key='result'
    )

# 2. Используйте templating для динамических значений
BashOperator(
    task_id='process',
    bash_command='python process.py --date {{ ds }} --hour {{ execution_date.hour }}'
)

# 3. Обработка ошибок с on_failure_callback
def on_failure(context):
    print(f"Task failed: {context['task'].task_id}")
    # Отправить alert

PythonOperator(
    task_id='risky_task',
    python_callable=risky_function,
    on_failure_callback=on_failure
)

# 4. Используйте task groups для организации
from airflow.utils.task_group import TaskGroup

with TaskGroup('data_processing') as tg_processing:
    extract_task = PythonOperator(...)
    transform_task = PythonOperator(...)
    load_task = PythonOperator(...)
    extract_task >> transform_task >> load_task

Заключение

Операторы Airflow — это строительные блоки для создания сложных data pipeline. Выбор правильного оператора зависит от:

  • Типа задачи (выполнение кода, запрос БД, вызов API)
  • Интеграции требуемой системы (Spark, Kubernetes, облако)
  • Требований к изоляции (отдельная виртуальная среда или нет)

Владение основными операторами и понимание их возможностей — это ключ к эффективной разработке data pipeline в Airflow.

Какие операторы Airflow использовал? | PrepBro