← Назад к вопросам

Как работает Apache Airflow? Какие альтернативы существуют?

2.2 Middle🔥 161 комментариев
#Apache Airflow и оркестрация

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Apache Airflow: оркестрация рабочих потоков

Apache Airflow — это платформа для определения, планирования и мониторинга рабочих потоков (workflows/DAG). Это инструмент для автоматизации сложных, зависимых задач.

Основные компоненты Airflow

DAG (Directed Acyclic Graph) — граф задач без циклов, определяющий порядок выполнения и зависимости.

Task — атомарная единица работы (Python функция, SQL запрос, Bash скрипт).

Operator — класс, определяющий что делает Task (PythonOperator, BashOperator, SQLExecuteQueryOperator).

Scheduler — компонент, запускающий DAG по расписанию и следящий за зависимостями.

Executor — компонент, выполняющий Tasks (SequentialExecutor, LocalExecutor, KubernetesExecutor).

Webserver — веб-интерфейс для мониторинга и управления DAG.

Архитектура Airflow

┌─────────────────────────────────────────────────────────┐
│                    Webserver (UI)                        │
│        Мониторинг DAG, запуск, откат (Backfill)         │
└──────────────┬──────────────────────────────────────────┘
               │
┌──────────────┴──────────────────────────────────────────┐
│                     Scheduler                            │
│  Проверяет DAG каждые N секунд, запускает готовые DAG  │
└──────────────┬──────────────────────────────────────────┘
               │
┌──────────────┴──────────────────────────────────────────┐
│                      Executor                            │
│  Выполняет Tasks параллельно (N воркеров)               │
└──────────────┬──────────────────────────────────────────┘
               │
        ┌──────┴──────┬──────────┬──────────┐
       Task1        Task2       Task3      Task4
   (Python)    (BashOperator) (SQL)   (PythonOperator)

Простой пример DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# Определяем DAG
dag = DAG(
    'daily_etl_pipeline',
    description='Daily ETL pipeline',
    schedule_interval='0 2 * * *',  # 2 AM ежедневно
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    }
)

# Определяем функции для Python tasks
def extract_data():
    print("Extracting data from API...")
    # загружаем данные
    return "extracted_data.csv"

def transform_data(ti):  # ti = task instance
    # Получаем результат предыдущей task
    extracted_file = ti.xcom_pull(task_ids='extract')
    print(f"Transforming {extracted_file}...")
    # трансформируем
    return "transformed_data.csv"

# Task 1: Извлечение
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag
)

# Task 2: Трансформация
transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag
)

# Task 3: Загрузка в БД
load_task = PostgresOperator(
    task_id='load',
    postgres_conn_id='postgres_default',
    sql="""
        COPY target_table FROM '/tmp/transformed_data.csv'
        WITH (FORMAT CSV, HEADER TRUE);
    """,
    dag=dag
)

# Task 4: Проверка качества
validate_task = BashOperator(
    task_id='validate',
    bash_command='echo "Validating data..." && python /opt/scripts/validate.py',
    dag=dag
)

# Определяем зависимости (порядок выполнения)
extract_task >> transform_task >> load_task >> validate_task

# Эквивалентно:
# extract_task.set_downstream(transform_task)
# transform_task.set_downstream(load_task)
# load_task.set_downstream(validate_task)

Как работает Scheduler

# Scheduler каждые N секунд (обычно 60) делает:

1. PARSE DAG файлы
   Ищет все DAG в папке ~/airflow/dags/*.py

2. DETERMINE READY DAG RUNS
   Определяет, какие DAG runs готовы к запуску
   Примеры:
   - DAG с schedule='0 2 * * *' готов в 2 AM
   - DAG с schedule='@hourly' готов каждый час
   - DAG без расписания запускается вручную

3. QUEUE TASKS
   Добавляет готовые Tasks в очередь Executor

4. MONITOR TASKS
   Проверяет статус выполняющихся Tasks

5. MARK COMPLETED
   Помечает завершённые Tasks и DAG runs

Обработка зависимостей

from airflow.models import DAG
from airflow.operators.python import PythonOperator

with DAG('complex_dependencies') as dag:
    
    # Простая цепь
    task1 >> task2 >> task3
    
    # Разветвления
    task1 >> [task2, task3] >> task4  # task4 ждёт обоих
    
    # Сложные зависимости
    task1 >> [task2, task3]
    task2 >> task4
    task3 >> task4
    task4 >> task5

# Визуализация:
#       task1
#       /   \\
#     task2  task3
#       \   /
#       task4
#        |
#       task5

Типы Operator

1. PythonOperator

PythonOperator(
    task_id='my_task',
    python_callable=my_function,  # функция для выполнения
    op_args=[arg1, arg2],
    op_kwargs={'key': 'value'},
    provide_context=True,  # передаёт context в функцию
)

2. BashOperator

BashOperator(
    task_id='bash_task',
    bash_command='echo Hello && python script.py',
)

3. PostgresOperator

PostgresOperator(
    task_id='load_sql',
    postgres_conn_id='postgres_default',
    sql='INSERT INTO table SELECT * FROM ...'
)

4. HttpOperator

SimpleHttpOperator(
    task_id='api_call',
    http_conn_id='my_api',
    endpoint='/api/v1/data',
    method='GET',
    response_filter=lambda response: response.json(),
)

XCom (передача данных между tasks)

def task_push():
    """Отправляет данные в XCom"""
    return {'key': 'value', 'data': 123}

def task_pull(ti):  # ti = TaskInstance
    """Получает данные из XCom"""
    pulled_data = ti.xcom_pull(task_ids='push_task')
    print(pulled_data)  # {'key': 'value', 'data': 123}

# В DAG:
PythonOperator(task_id='push_task', python_callable=task_push) >> \
PythonOperator(task_id='pull_task', python_callable=task_pull)

Обработка ошибок

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG('error_handling') as dag:
    
    def may_fail():
        import random
        if random.random() > 0.5:
            raise Exception("Random failure!")
    
    task1 = PythonOperator(
        task_id='task1',
        python_callable=may_fail,
        retries=3,  # переспроси 3 раза
        retry_delay=timedelta(minutes=5),  # ждать 5 минут перед retry
    )
    
    def handle_success():
        print("Task succeeded!")
    
    def handle_failure():
        print("Task failed after retries!")
    
    success_task = PythonOperator(
        task_id='success',
        python_callable=handle_success,
        trigger_rule=TriggerRule.ALL_SUCCESS,  # выполнить если предыдущая успешна
    )
    
    failure_task = PythonOperator(
        task_id='failure',
        python_callable=handle_failure,
        trigger_rule=TriggerRule.ONE_FAILED,  # выполнить если предыдущая провалилась
    )
    
    task1 >> [success_task, failure_task]

Типы Executor

SequentialExecutor — выполняет tasks по одному (медленно, для тестирования)

LocalExecutor — выполняет tasks параллельно на одной машине (default)

KubernetesExecutor — выполняет каждую task в отдельном Pod (масштабируемо)

CeleryExecutor — выполняет tasks на удалённых workers через очередь (distributed)

Запуск Airflow локально

# Инициализация
airflow db init

# Создание пользователя
airflow users create -u admin -p admin -r Admin

# Запуск Scheduler
airflow scheduler

# Запуск Webserver (в другом терминале)
airflow webserver -p 8080

# Webserver доступен на http://localhost:8080

# Заборт DAG
airflow dags list

# Запуск DAG вручную
airflow dags trigger -e 2024-03-20 daily_etl_pipeline

# Откат DAG на дату (Backfill)
airflow dags backfill daily_etl_pipeline -s 2024-01-01 -e 2024-03-20

Альтернативы Apache Airflow

1. Prefect

from prefect import flow, task

@task
def extract():
    return "data"

@task
def transform(data):
    return data.upper()

@flow
def my_pipeline():
    data = extract()
    result = transform(data)
    return result

my_pipeline()

Преимущества:

  • Более современная архитектура
  • Лучше обработка ошибок
  • Проще использовать
  • Хороший облачный сервис (Prefect Cloud)

2. Dagster

from dagster import job, op

@op
def extract_op():
    return "data"

@op
def transform_op(data):
    return data.upper()

@job
def my_job():
    transform_op(extract_op())

my_job.execute_in_process()

Преимущества:

  • Типизация и валидация
  • Лучше для data engineering
  • Хорошая обработка ошибок
  • Разработан Elementl

3. Apache Spark (для больших данных)

Если рабочий поток — это в основном обработка больших данных:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL").getOrCreate()
df = spark.read.parquet("data.parquet")
df = df.filter(df.salary > 50000)
df.write.parquet("output.parquet")

4. Kubernetes CronJob

Для простых tasks:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: daily-etl
spec:
  schedule: "0 2 * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: etl-job
            image: my-etl:latest
            command: ["python", "etl_script.py"]
          restartPolicy: OnFailure

5. dbt (Data Build Tool)

Для трансформации данных в SQL:

# dbt_project.yml
name: 'my_analytics'
version: '1.0.0'

# Выполнение
dbt run      # запустить все модели
dbt test     # запустить тесты
dbt docs generate  # создать документацию

Сравнительная таблица

ХарактеристикаAirflowPrefectDagsterdbt
Кривая обученияСредняяНизкаяСредняяНизкая
МасштабируемостьХорошоОтличнойОтличнойХорошо
ТипизацияНетНетДаНет
Cloud сервисНетДа (Prefect Cloud)НетДа (dbt Cloud)
SQL трансформацияНетНетНетДа (специализирован)
СообществоОгромноеСреднееРастётРастёт

Best Practices для Airflow

  1. Используй Jinja2 для динамических DAG
  2. Разделяй логику в отдельные функции
  3. Используй XCom для передачи данных
  4. Добавляй мониторинг и alerts
  5. Используй KubernetesExecutor для масштабирования
  6. Версионируй DAG в Git
  7. Пиши тесты для DAG

Заключение

Apache Airflow — это индустриальный стандарт для оркестрации рабочих потоков:

  • Подходит для сложных, многошаговых pipelines
  • Хорошо масштабируется на большие объёмы
  • Имеет богатую экосистему провайдеров
  • Альтернативы (Prefect, Dagster) часто лучше для новых проектов

Выбор инструмента зависит от:

  • Сложности workflow
  • Требований к масштабируемости
  • Знакомства команды
  • Бюджета
Как работает Apache Airflow? Какие альтернативы существуют? | PrepBro