Какие операторы Airflow использовал?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Операторы Apache Airflow
Что такое операторы в Airflow
Оператор (Operator) — это класс, который определяет единую задачу (task) в DAG. Каждый оператор выполняет конкретное действие: выполнение кода, запуск скриптов, запросы к базам данных, отправка сообщений и т.д.
Оператор инкапсулирует логику и обеспечивает интеграцию с различными системами.
Основные встроенные операторы
1. PythonOperator
Назначение: Выполнение Python функции
from airflow.operators.python import PythonOperator
from datetime import datetime
def load_data(**context):
# Доступ к контексту выполнения
execution_date = context['execution_date']
task_instance = context['task_instance']
print(f'Loading data for {execution_date}')
# Возврат данных для следующих операторов
return {'loaded_rows': 1000, 'status': 'success'}
task_load = PythonOperator(
task_id='load_data',
python_callable=load_data,
provide_context=True,
dag=dag
)
2. BashOperator
Назначение: Выполнение bash команд
from airflow.operators.bash import BashOperator
task_bash = BashOperator(
task_id='run_spark_job',
bash_command='''
spark-submit \
--class com.example.DataProcessor \
--master spark://spark-master:7077 \
/apps/data-processor.jar \
--input s3://bucket/input/ \
--output s3://bucket/output/ \
--date {{ ds }}
''',
dag=dag
)
3. PythonVirtualenvOperator
Назначение: Выполнение Python функции в изолированной виртуальной среде
from airflow.operators.python import PythonVirtualenvOperator
def transform_data():
import pandas as pd
import polars as pl
df = pl.read_csv('/tmp/data.csv')
result = df.filter(pl.col('amount') > 100)
return result.to_dicts()
task_venv = PythonVirtualenvOperator(
task_id='transform_with_venv',
python_callable=transform_data,
requirements=['pandas==1.5.0', 'polars==0.18.0'],
dag=dag
)
4. SqlOperator (и его варианты)
Назначение: Выполнение SQL запросов
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
# PostgreSQL
task_postgres = PostgresOperator(
task_id='create_customer_mart',
sql='''
CREATE TABLE IF NOT EXISTS marts.customer_summary AS
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(amount) as total_spent
FROM staging.orders
GROUP BY customer_id;
''',
postgres_conn_id='postgres_warehouse',
dag=dag
)
# Snowflake
task_snowflake = SnowflakeOperator(
task_id='load_to_snowflake',
sql='INSERT INTO warehouse.events SELECT * FROM staging.raw_events;',
snowflake_conn_id='snowflake_prod',
dag=dag
)
5. SparkSubmitOperator
Назначение: Отправка Spark jobs на кластер
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
task_spark = SparkSubmitOperator(
task_id='process_large_dataset',
application='/opt/spark-jobs/etl.py',
conf={
'spark.executor.memory': '8g',
'spark.executor.cores': '4',
'spark.driver.memory': '4g'
},
application_args=[
'--input', 's3://bucket/input/',
'--output', 's3://bucket/output/',
'--mode', 'batch'
],
spark_binary='/opt/spark/bin/spark-submit',
dag=dag
)
6. BranchOperator
Назначение: Условное ветвление (выполнение одного из нескольких путей)
from airflow.operators.python import PythonOperator, BranchPythonOperator
def decide_branch(**context):
# Логика принятия решения
rows = context['task_instance'].xcom_pull(task_ids='check_data_quality')
if rows > 1000000:
return 'process_large_dataset'
else:
return 'process_small_dataset'
task_branch = BranchPythonOperator(
task_id='decide_processing',
python_callable=decide_branch,
dag=dag
)
task_large = PythonOperator(
task_id='process_large_dataset',
python_callable=lambda: print('Processing large dataset'),
dag=dag
)
task_small = PythonOperator(
task_id='process_small_dataset',
python_callable=lambda: print('Processing small dataset'),
dag=dag
)
task_branch >> [task_large, task_small]
7. HttpOperator
Назначение: Отправка HTTP запросов
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
task_http = SimpleHttpOperator(
task_id='call_api',
method='POST',
http_conn_id='api_endpoint',
endpoint='/v1/data/process',
data=json.dumps({
'date': '{{ ds }}',
'mode': 'incremental'
}),
headers={'Content-Type': 'application/json'},
dag=dag
)
8. S3Operator (и другие облачные операторы)
Назначение: Работа с облачным хранилищем
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateBucketOperator,
S3DeleteObjectsOperator,
S3ListOperator
)
task_list_s3 = S3ListOperator(
task_id='list_files_in_s3',
bucket='my-data-bucket',
prefix='raw/2026-03-26/',
aws_conn_id='aws_default',
dag=dag
)
task_delete_old = S3DeleteObjectsOperator(
task_id='cleanup_old_data',
bucket='my-data-bucket',
keys=['archive/2023/**'],
aws_conn_id='aws_default',
dag=dag
)
9. Sensor операторы
Назначение: Ожидание условия или события
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.s3_key_size import S3KeySizeSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_delta import TimeDeltaSensor
# Ожидание файла на диске
task_file_sensor = FileSensor(
task_id='wait_for_input_file',
filepath='/data/input/transactions.csv',
poke_interval=60, # Проверять каждые 60 сек
timeout=3600, # Timeout 1 час
dag=dag
)
# Ожидание файла в S3 достаточного размера
task_s3_sensor = S3KeySizeSensor(
task_id='wait_for_s3_file',
bucket_name='my-bucket',
bucket_key='data/input/data.parquet',
check_fn=lambda size: size > 1024 * 1024 * 100, # Ждём 100+ MB
aws_conn_id='aws_default',
dag=dag
)
# Ожидание завершения другого DAG
task_external_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_pipeline',
external_task_id='final_task',
execution_delta=timedelta(hours=-1),
dag=dag
)
# Задержка перед следующей задачей
task_delay = TimeDeltaSensor(
task_id='wait_before_processing',
delta=timedelta(minutes=30),
dag=dag
)
10. EmailOperator
Назначение: Отправка email уведомлений
from airflow.operators.email_operator import EmailOperator
task_email = EmailOperator(
task_id='send_completion_email',
to='data-team@company.com',
subject='Pipeline {{ ds }} completed',
html_content='''
<h2>Data Pipeline Report</h2>
<p>Date: {{ ds }}</p>
<p>Status: {{ task_instance.state }}</p>
<p><a href="{{ task_instance.log_url }}">View Logs</a></p>
''',
dag=dag
)
11. TriggerDagRunOperator
Назначение: Запуск другого DAG из текущего
from airflow.operators.dagrun_operator import TriggerDagRunOperator
task_trigger = TriggerDagRunOperator(
task_id='trigger_downstream_pipeline',
trigger_dag_id='customer_analytics_pipeline',
execution_date='{{ execution_date }}',
dag=dag
)
12. DummyOperator
Назначение: Placeholder операция (для структурирования DAG)
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> [task1, task2] >> end
Комплексный пример DAG со множеством операторов
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.s3_key_size import S3KeySizeSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Ежедневно в 2:00 UTC
start_date=datetime(2023, 1, 1),
) as dag:
# 1. Ожидание входных данных
wait_input = FileSensor(
task_id='wait_input_file',
filepath='/data/input/transactions.csv',
poke_interval=60,
timeout=3600
)
# 2. Извлечение и подготовка
def extract_data(**context):
import pandas as pd
df = pd.read_csv('/data/input/transactions.csv')
print(f'Extracted {len(df)} rows')
return {'rows': len(df)}
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data
)
# 3. Трансформация в Spark
transform = BashOperator(
task_id='transform_spark',
bash_command='spark-submit /opt/jobs/transform.py --date {{ ds }}'
)
# 4. Загрузка в базу данных
load_db = PostgresOperator(
task_id='load_to_db',
sql='''
INSERT INTO marts.transactions_summary
SELECT date, count(*) as total_count, sum(amount) as total_amount
FROM staging.transactions
WHERE date = '{{ ds }}'
GROUP BY date;
''',
postgres_conn_id='postgres_warehouse'
)
# 5. Валидация данных
def validate_data(**context):
# Проверка качества
print('Validating data quality...')
return True
validate = PythonOperator(
task_id='validate_data',
python_callable=validate_data
)
# 6. Конец
finish = DummyOperator(task_id='finish')
# Определение зависимостей
wait_input >> extract >> transform >> load_db >> validate >> finish
Best Practices при использовании операторов
# 1. Используйте XCom для передачи данных между операторами
def task1(**context):
result = {'key': 'value'}
context['task_instance'].xcom_push(key='result', value=result)
def task2(**context):
result = context['task_instance'].xcom_pull(
task_ids='task1',
key='result'
)
# 2. Используйте templating для динамических значений
BashOperator(
task_id='process',
bash_command='python process.py --date {{ ds }} --hour {{ execution_date.hour }}'
)
# 3. Обработка ошибок с on_failure_callback
def on_failure(context):
print(f"Task failed: {context['task'].task_id}")
# Отправить alert
PythonOperator(
task_id='risky_task',
python_callable=risky_function,
on_failure_callback=on_failure
)
# 4. Используйте task groups для организации
from airflow.utils.task_group import TaskGroup
with TaskGroup('data_processing') as tg_processing:
extract_task = PythonOperator(...)
transform_task = PythonOperator(...)
load_task = PythonOperator(...)
extract_task >> transform_task >> load_task
Заключение
Операторы Airflow — это строительные блоки для создания сложных data pipeline. Выбор правильного оператора зависит от:
- Типа задачи (выполнение кода, запрос БД, вызов API)
- Интеграции требуемой системы (Spark, Kubernetes, облако)
- Требований к изоляции (отдельная виртуальная среда или нет)
Владение основными операторами и понимание их возможностей — это ключ к эффективной разработке data pipeline в Airflow.