← Назад к вопросам

Что такое Apache Airflow?

1.3 Junior🔥 131 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI30 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Apache Airflow: Оркестрация данных и workflow

Apache Airflow — это платформа с открытым исходным кодом для создания, планирования и мониторинга сложных рабочих процессов (DAG). Это инструмент для оркестрации задач в Data Science и Data Engineering.

Основные концепции

DAG (Directed Acyclic Graph) Это ориентированный ациклический граф задач. Каждая задача (Task) — это узел, стрелки показывают зависимости. Граф должен быть ациклическим (без циклических зависимостей).

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Определяем DAG
default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
}

dag = DAG(
    'ml_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Каждый день в 2:00 AM
    catchup=False,
)

# Определяем задачи
def fetch_data():
    print("Загружаю данные...")
    # Логика загрузки

def train_model():
    print("Обучаю модель...")
    # Логика обучения

def evaluate():
    print("Оцениваю модель...")
    # Логика оценки

# Создаём операторы
task_fetch = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag,
)

task_train = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

task_evaluate = PythonOperator(
    task_id='evaluate',
    python_callable=evaluate,
    dag=dag,
)

# Определяем зависимости
task_fetch >> task_train >> task_evaluate

Архитектура DAG:

fetch_data
    |
    v
train_model
    |
    v
evaluate

Компоненты Airflow

1. Scheduler (Планировщик) Постоянно работающий процесс, который:

  • Проверяет DAG файлы на предмет изменений
  • Определяет, какие задачи нужно запустить
  • Создаёт instances задач на основе расписания
  • Выполняет задачи в нужное время

2. Executor (Исполнитель) Выполняет задачи:

  • LocalExecutor: выполняет все задачи локально (для разработки)
  • SequentialExecutor: одна задача за раз (очень медленно)
  • CeleryExecutor: распределённое выполнение через Celery (production)
  • KubernetesExecutor: запускает каждую задачу в отдельном Pod'е (cloud-native)

3. Web UI (Веб-интерфейс)

  • Визуализация DAG'ов
  • Мониторинг выполнения
  • Ручной запуск задач
  • Просмотр логов
  • Управление переменными и коннекшенами

4. Metastore (Хранилище метаданных) Обычно PostgreSQL или MySQL. Хранит:

  • Историю выполнений
  • Статусы задач
  • Переменные
  • Коннекшены к внешним системам

Типичный ML Pipeline в Airflow

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.models import Variable

dag = DAG('daily_ml_pipeline', schedule_interval='0 2 * * *')

# 1. Извлечение данных из БД
extract_task = PostgresOperator(
    task_id='extract_data',
    sql='SELECT * FROM raw_events WHERE date = {{ ds }}',
    postgres_conn_id='production_db',
    dag=dag,
)

# 2. Предварительная обработка
preprocess_task = BashOperator(
    task_id='preprocess',
    bash_command='python /dags/scripts/preprocess.py --date {{ ds }}',
    dag=dag,
)

# 3. Обучение модели
def train_ml_model(**context):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    
    data = pd.read_csv(f"/data/processed_{context['ds']}.csv")
    model = RandomForestClassifier()
    model.fit(data.drop('target', axis=1), data['target'])
    
    # Сохраняем модель
    import joblib
    joblib.dump(model, f"/models/model_{context['ds']}.pkl")

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_ml_model,
    provide_context=True,
    dag=dag,
)

# 4. Оценка модели
def evaluate_model(**context):
    from sklearn.metrics import accuracy_score, precision_score
    # Логика оценки
    pass

evaluate_task = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag,
)

# 5. Развёртывание в production
deploy_task = BashOperator(
    task_id='deploy_to_prod',
    bash_command='cp /models/model_{{ ds }}.pkl /prod/models/',
    dag=dag,
)

# Определяем workflow
extract_task >> preprocess_task >> train_task >> evaluate_task >> deploy_task

Преимущества Airflow

  • Визуализация DAG'ов — понимаешь сложные зависимости на один взгляд
  • Переиспользуемость — операторы и задачи легко переиспользуются
  • Мониторинг — видно статус каждой задачи в реальном времени
  • Управление зависимостями — гарантирует правильный порядок выполнения
  • Retry механика — автоматически повторяет упавшие задачи
  • Масштабируемость — от одной машины до кластера
  • Интеграция — готовые операторы для S3, BigQuery, PostgreSQL, Spark, Docker и т.д.

Недостатки

  • Сложная кривая обучения — нужно понимать концепции DAG, XCom, операторы
  • Overhead — требует отдельного сервера для Scheduler и Webserver
  • Зависимость от metastore — если BD упала, Airflow не работает
  • Не оптимален для streaming — лучше использовать Kafka, Flink или Spark Streaming

Альтернативы

  • Prefect — более современный, проще в использовании
  • Dagster — сильнее фокус на data assets и testing
  • dbt — для transformation, простже Airflow для ELT
  • Nextflow — специализирован на bioinformatics и scientific computing

Итог

Apache Airflow — это gold standard для ML pipeline оркестрации. Если у вас есть сложный workflow с множеством зависимостей, который нужно запускать по расписанию, мониторить и перезапускать — это ваш инструмент.

Что такое Apache Airflow? | PrepBro