Как работает Apache Airflow? Какие альтернативы существуют?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Apache Airflow: оркестрация рабочих потоков
Apache Airflow — это платформа для определения, планирования и мониторинга рабочих потоков (workflows/DAG). Это инструмент для автоматизации сложных, зависимых задач.
Основные компоненты Airflow
DAG (Directed Acyclic Graph) — граф задач без циклов, определяющий порядок выполнения и зависимости.
Task — атомарная единица работы (Python функция, SQL запрос, Bash скрипт).
Operator — класс, определяющий что делает Task (PythonOperator, BashOperator, SQLExecuteQueryOperator).
Scheduler — компонент, запускающий DAG по расписанию и следящий за зависимостями.
Executor — компонент, выполняющий Tasks (SequentialExecutor, LocalExecutor, KubernetesExecutor).
Webserver — веб-интерфейс для мониторинга и управления DAG.
Архитектура Airflow
┌─────────────────────────────────────────────────────────┐
│ Webserver (UI) │
│ Мониторинг DAG, запуск, откат (Backfill) │
└──────────────┬──────────────────────────────────────────┘
│
┌──────────────┴──────────────────────────────────────────┐
│ Scheduler │
│ Проверяет DAG каждые N секунд, запускает готовые DAG │
└──────────────┬──────────────────────────────────────────┘
│
┌──────────────┴──────────────────────────────────────────┐
│ Executor │
│ Выполняет Tasks параллельно (N воркеров) │
└──────────────┬──────────────────────────────────────────┘
│
┌──────┴──────┬──────────┬──────────┐
Task1 Task2 Task3 Task4
(Python) (BashOperator) (SQL) (PythonOperator)
Простой пример DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# Определяем DAG
dag = DAG(
'daily_etl_pipeline',
description='Daily ETL pipeline',
schedule_interval='0 2 * * *', # 2 AM ежедневно
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
)
# Определяем функции для Python tasks
def extract_data():
print("Extracting data from API...")
# загружаем данные
return "extracted_data.csv"
def transform_data(ti): # ti = task instance
# Получаем результат предыдущей task
extracted_file = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extracted_file}...")
# трансформируем
return "transformed_data.csv"
# Task 1: Извлечение
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag
)
# Task 2: Трансформация
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag
)
# Task 3: Загрузка в БД
load_task = PostgresOperator(
task_id='load',
postgres_conn_id='postgres_default',
sql="""
COPY target_table FROM '/tmp/transformed_data.csv'
WITH (FORMAT CSV, HEADER TRUE);
""",
dag=dag
)
# Task 4: Проверка качества
validate_task = BashOperator(
task_id='validate',
bash_command='echo "Validating data..." && python /opt/scripts/validate.py',
dag=dag
)
# Определяем зависимости (порядок выполнения)
extract_task >> transform_task >> load_task >> validate_task
# Эквивалентно:
# extract_task.set_downstream(transform_task)
# transform_task.set_downstream(load_task)
# load_task.set_downstream(validate_task)
Как работает Scheduler
# Scheduler каждые N секунд (обычно 60) делает:
1. PARSE DAG файлы
Ищет все DAG в папке ~/airflow/dags/*.py
2. DETERMINE READY DAG RUNS
Определяет, какие DAG runs готовы к запуску
Примеры:
- DAG с schedule='0 2 * * *' готов в 2 AM
- DAG с schedule='@hourly' готов каждый час
- DAG без расписания запускается вручную
3. QUEUE TASKS
Добавляет готовые Tasks в очередь Executor
4. MONITOR TASKS
Проверяет статус выполняющихся Tasks
5. MARK COMPLETED
Помечает завершённые Tasks и DAG runs
Обработка зависимостей
from airflow.models import DAG
from airflow.operators.python import PythonOperator
with DAG('complex_dependencies') as dag:
# Простая цепь
task1 >> task2 >> task3
# Разветвления
task1 >> [task2, task3] >> task4 # task4 ждёт обоих
# Сложные зависимости
task1 >> [task2, task3]
task2 >> task4
task3 >> task4
task4 >> task5
# Визуализация:
# task1
# / \\
# task2 task3
# \ /
# task4
# |
# task5
Типы Operator
1. PythonOperator
PythonOperator(
task_id='my_task',
python_callable=my_function, # функция для выполнения
op_args=[arg1, arg2],
op_kwargs={'key': 'value'},
provide_context=True, # передаёт context в функцию
)
2. BashOperator
BashOperator(
task_id='bash_task',
bash_command='echo Hello && python script.py',
)
3. PostgresOperator
PostgresOperator(
task_id='load_sql',
postgres_conn_id='postgres_default',
sql='INSERT INTO table SELECT * FROM ...'
)
4. HttpOperator
SimpleHttpOperator(
task_id='api_call',
http_conn_id='my_api',
endpoint='/api/v1/data',
method='GET',
response_filter=lambda response: response.json(),
)
XCom (передача данных между tasks)
def task_push():
"""Отправляет данные в XCom"""
return {'key': 'value', 'data': 123}
def task_pull(ti): # ti = TaskInstance
"""Получает данные из XCom"""
pulled_data = ti.xcom_pull(task_ids='push_task')
print(pulled_data) # {'key': 'value', 'data': 123}
# В DAG:
PythonOperator(task_id='push_task', python_callable=task_push) >> \
PythonOperator(task_id='pull_task', python_callable=task_pull)
Обработка ошибок
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG('error_handling') as dag:
def may_fail():
import random
if random.random() > 0.5:
raise Exception("Random failure!")
task1 = PythonOperator(
task_id='task1',
python_callable=may_fail,
retries=3, # переспроси 3 раза
retry_delay=timedelta(minutes=5), # ждать 5 минут перед retry
)
def handle_success():
print("Task succeeded!")
def handle_failure():
print("Task failed after retries!")
success_task = PythonOperator(
task_id='success',
python_callable=handle_success,
trigger_rule=TriggerRule.ALL_SUCCESS, # выполнить если предыдущая успешна
)
failure_task = PythonOperator(
task_id='failure',
python_callable=handle_failure,
trigger_rule=TriggerRule.ONE_FAILED, # выполнить если предыдущая провалилась
)
task1 >> [success_task, failure_task]
Типы Executor
SequentialExecutor — выполняет tasks по одному (медленно, для тестирования)
LocalExecutor — выполняет tasks параллельно на одной машине (default)
KubernetesExecutor — выполняет каждую task в отдельном Pod (масштабируемо)
CeleryExecutor — выполняет tasks на удалённых workers через очередь (distributed)
Запуск Airflow локально
# Инициализация
airflow db init
# Создание пользователя
airflow users create -u admin -p admin -r Admin
# Запуск Scheduler
airflow scheduler
# Запуск Webserver (в другом терминале)
airflow webserver -p 8080
# Webserver доступен на http://localhost:8080
# Заборт DAG
airflow dags list
# Запуск DAG вручную
airflow dags trigger -e 2024-03-20 daily_etl_pipeline
# Откат DAG на дату (Backfill)
airflow dags backfill daily_etl_pipeline -s 2024-01-01 -e 2024-03-20
Альтернативы Apache Airflow
1. Prefect
from prefect import flow, task
@task
def extract():
return "data"
@task
def transform(data):
return data.upper()
@flow
def my_pipeline():
data = extract()
result = transform(data)
return result
my_pipeline()
Преимущества:
- Более современная архитектура
- Лучше обработка ошибок
- Проще использовать
- Хороший облачный сервис (Prefect Cloud)
2. Dagster
from dagster import job, op
@op
def extract_op():
return "data"
@op
def transform_op(data):
return data.upper()
@job
def my_job():
transform_op(extract_op())
my_job.execute_in_process()
Преимущества:
- Типизация и валидация
- Лучше для data engineering
- Хорошая обработка ошибок
- Разработан Elementl
3. Apache Spark (для больших данных)
Если рабочий поток — это в основном обработка больших данных:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()
df = spark.read.parquet("data.parquet")
df = df.filter(df.salary > 50000)
df.write.parquet("output.parquet")
4. Kubernetes CronJob
Для простых tasks:
apiVersion: batch/v1
kind: CronJob
metadata:
name: daily-etl
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: etl-job
image: my-etl:latest
command: ["python", "etl_script.py"]
restartPolicy: OnFailure
5. dbt (Data Build Tool)
Для трансформации данных в SQL:
# dbt_project.yml
name: 'my_analytics'
version: '1.0.0'
# Выполнение
dbt run # запустить все модели
dbt test # запустить тесты
dbt docs generate # создать документацию
Сравнительная таблица
| Характеристика | Airflow | Prefect | Dagster | dbt |
|---|---|---|---|---|
| Кривая обучения | Средняя | Низкая | Средняя | Низкая |
| Масштабируемость | Хорошо | Отличной | Отличной | Хорошо |
| Типизация | Нет | Нет | Да | Нет |
| Cloud сервис | Нет | Да (Prefect Cloud) | Нет | Да (dbt Cloud) |
| SQL трансформация | Нет | Нет | Нет | Да (специализирован) |
| Сообщество | Огромное | Среднее | Растёт | Растёт |
Best Practices для Airflow
- Используй Jinja2 для динамических DAG
- Разделяй логику в отдельные функции
- Используй XCom для передачи данных
- Добавляй мониторинг и alerts
- Используй KubernetesExecutor для масштабирования
- Версионируй DAG в Git
- Пиши тесты для DAG
Заключение
Apache Airflow — это индустриальный стандарт для оркестрации рабочих потоков:
- Подходит для сложных, многошаговых pipelines
- Хорошо масштабируется на большие объёмы
- Имеет богатую экосистему провайдеров
- Альтернативы (Prefect, Dagster) часто лучше для новых проектов
Выбор инструмента зависит от:
- Сложности workflow
- Требований к масштабируемости
- Знакомства команды
- Бюджета