← Назад к вопросам
Что такое Apache Airflow?
2.0 Middle🔥 131 комментариев
#DevOps и инфраструктура#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Apache Airflow — Оркестрирование рабочих процессов
Apache Airflow — это платформа с открытым исходным кодом для программного определения, планирования и мониторинга рабочих процессов (workflows). Написана на Python, очень популярна в Data Engineering и MLOps.
Что решает Airflow
Представь, что у тебя есть цепочка задач:
- Загрузить данные из S3
- Обработать данные (очистка, трансформация)
- Загрузить результат в Postgres
- Отправить отчёт на email
Без Airflow:
- Пишешь Python скрипт
- Запускаешь через cron каждый день
- Если шаг 2 падает, шаги 3-4 тоже упадут
- Нет логирования, нет переиспользования кода
С Airflow:
- Описываешь workflow как граф задач (DAG)
- Airflow автоматически управляет порядком выполнения
- Retry при ошибках
- Логирование, мониторинг, alerting
- Легко масштабировать
Основные концепции
DAG (Directed Acyclic Graph)
Визуальное представление workflow как направленного ациклического графа.
Operators
Оператор — это шаблон для выполнения задачи:
- PythonOperator — выполняет Python функцию
- BashOperator — выполняет bash команду
- SQLOperator — выполняет SQL запрос
- EmailOperator — отправляет email
Пример кода
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print('Загружаем данные из S3')
def process_data():
print('Обрабатываем данные')
def load_data():
print('Загружаем в Postgres')
with DAG(
dag_id='data_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data
)
process_task = PythonOperator(
task_id='process',
python_callable=process_data
)
load_task = PythonOperator(
task_id='load',
python_callable=load_data
)
extract_task >> process_task >> load_task
Task Dependencies
Зависимости между задачами:
task1 >> task2 >> task3
[task1, task2, task3] >> task4
Как работает Airflow
- Scheduler — следит за расписанием и запускает DAG'и
- Executor — выполняет задачи
- Webserver — веб-интерфейс для мониторинга
- Metadata Database — хранит состояние задач
Обработка ошибок
from airflow.exceptions import AirflowException
def process_with_retry():
try:
result = some_api_call()
except Exception as e:
raise AirflowException(f'API failed: {e}')
task = PythonOperator(
task_id='process',
python_callable=process_with_retry,
retries=3,
retry_delay=timedelta(minutes=5),
on_failure_callback=send_alert
)
Когда использовать Airflow
✅ Используй, если:
- Есть сложные зависимости между задачами
- Нужна отказоустойчивость и мониторинг
- Работаешь в Data Engineering или MLOps
- Множество запланированных задач
❌ Не используй, если:
- Одна простая задача
- Нужна real-time обработка
На интервью
Ответь кратко: 'Я использовал Airflow для ETL pipeline: загрузка данных из API → трансформация → загрузка в БД. Использовал retry и email notifications для alerting.'