← Назад к вопросам

Что такое Apache Airflow?

2.0 Middle🔥 131 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Apache Airflow — Оркестрирование рабочих процессов

Apache Airflow — это платформа с открытым исходным кодом для программного определения, планирования и мониторинга рабочих процессов (workflows). Написана на Python, очень популярна в Data Engineering и MLOps.

Что решает Airflow

Представь, что у тебя есть цепочка задач:

  1. Загрузить данные из S3
  2. Обработать данные (очистка, трансформация)
  3. Загрузить результат в Postgres
  4. Отправить отчёт на email

Без Airflow:

  • Пишешь Python скрипт
  • Запускаешь через cron каждый день
  • Если шаг 2 падает, шаги 3-4 тоже упадут
  • Нет логирования, нет переиспользования кода

С Airflow:

  • Описываешь workflow как граф задач (DAG)
  • Airflow автоматически управляет порядком выполнения
  • Retry при ошибках
  • Логирование, мониторинг, alerting
  • Легко масштабировать

Основные концепции

DAG (Directed Acyclic Graph)

Визуальное представление workflow как направленного ациклического графа.

Operators

Оператор — это шаблон для выполнения задачи:

  • PythonOperator — выполняет Python функцию
  • BashOperator — выполняет bash команду
  • SQLOperator — выполняет SQL запрос
  • EmailOperator — отправляет email

Пример кода

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print('Загружаем данные из S3')
    
def process_data():
    print('Обрабатываем данные')
    
def load_data():
    print('Загружаем в Postgres')

with DAG(
    dag_id='data_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract_data
    )
    
    process_task = PythonOperator(
        task_id='process',
        python_callable=process_data
    )
    
    load_task = PythonOperator(
        task_id='load',
        python_callable=load_data
    )
    
    extract_task >> process_task >> load_task

Task Dependencies

Зависимости между задачами:

task1 >> task2 >> task3
[task1, task2, task3] >> task4

Как работает Airflow

  1. Scheduler — следит за расписанием и запускает DAG'и
  2. Executor — выполняет задачи
  3. Webserver — веб-интерфейс для мониторинга
  4. Metadata Database — хранит состояние задач

Обработка ошибок

from airflow.exceptions import AirflowException

def process_with_retry():
    try:
        result = some_api_call()
    except Exception as e:
        raise AirflowException(f'API failed: {e}')

task = PythonOperator(
    task_id='process',
    python_callable=process_with_retry,
    retries=3,
    retry_delay=timedelta(minutes=5),
    on_failure_callback=send_alert
)

Когда использовать Airflow

Используй, если:

  • Есть сложные зависимости между задачами
  • Нужна отказоустойчивость и мониторинг
  • Работаешь в Data Engineering или MLOps
  • Множество запланированных задач

Не используй, если:

  • Одна простая задача
  • Нужна real-time обработка

На интервью

Ответь кратко: 'Я использовал Airflow для ETL pipeline: загрузка данных из API → трансформация → загрузка в БД. Использовал retry и email notifications для alerting.'

Что такое Apache Airflow? | PrepBro