← Назад к вопросам

Из чего состоит Airflow

1.7 Middle🔥 201 комментариев
#Django#Асинхронность и многопоточность#Базы данных (NoSQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Apache Airflow: Архитектура и компоненты

Apache Airflow — это платформа для оркестрации workflow-ов, написанная на Python. Она позволяет определять, планировать и мониторить сложные data pipelines.

Основные компоненты Airflow

1. Scheduler (Планировщик)

Отвечает за:

  • Мониторинг DAG-ов (Directed Acyclic Graphs)
  • Создание task instances согласно расписанию
  • Определение готовности задач к выполнению
  • Отправка готовых задач в очередь (queue)
# Scheduler запускает каждые N секунд
# и проверяет, какие задачи нужно запустить
while True:
    for dag in dags:
        for task in dag.tasks:
            if task.should_run(now):
                queue.put(task)  # Отправляет в очередь
    sleep(5)  # Проверяет каждые 5 секунд

2. Executor (Исполнитель)

Запускает task instances. Есть несколько типов:

  • SequentialExecutor — запускает одну задачу за раз (для разработки)
  • LocalExecutor — запускает параллельно в одной машине
  • CeleryExecutor — запускает на распределённых worker'ах
  • KubernetesExecutor — запускает в K8s подах
class Executor:
    def execute_task(self, task):
        # Запустить task
        result = task.run()
        return result

# LocalExecutor
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(task.run)

3. MetaDB (Metadata Database)

Сохраняет:

  • Историю выполнения задач
  • Состояние DAG-ов и task instances
  • Переменные и connection-ы
  • Логи и результаты
# Пример в PostgreSQL
CREATE TABLE task_instance (
    dag_id VARCHAR(250),
    task_id VARCHAR(250),
    execution_date TIMESTAMP,
    state VARCHAR(20),  -- queued, running, success, failed
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    duration FLOAT,
    PRIMARY KEY (dag_id, task_id, execution_date)
);

4. Web UI (User Interface)

Веб-интерфейс для:

  • Просмотра DAG-ов
  • Мониторинга выполнения
  • Управления переменными и connection-ами
  • Проверки логов
  • Ручного запуска задач
http://localhost:8080
├── DAGs (список всех pipeline-ов)
├── Graph View (визуализация зависимостей)
├── Tree View (история выполнения)
├── Log (логи каждой задачи)
├── Admin (переменные, connection-ы, пользователи)
└── Security (роли, permissions)

Как работает Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 1. Определяю DAG
dag = DAG(
    'my_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',  # Каждый день
    catchup=False
)

# 2. Определяю задачи
def fetch_data():
    print("Fetching data...")
    return "data"

def process_data(task_instance):
    data = task_instance.xcom_pull(task_ids='fetch')
    print(f"Processing: {data}")

def save_results():
    print("Saving results...")

# 3. Создаю task-и
fetch_task = PythonOperator(
    task_id='fetch',
    python_callable=fetch_data,
    dag=dag
)

process_task = PythonOperator(
    task_id='process',
    python_callable=process_data,
    dag=dag
)

save_task = PythonOperator(
    task_id='save',
    python_callable=save_results,
    dag=dag
)

# 4. Определяю зависимости
fetch_task >> process_task >> save_task

# Scheduler запустит это в таком порядке:
# 1. fetch_task (параллельно может быть несколько)
# 2. process_task (когда fetch завершена)
# 3. save_task (когда process завершена)

Процесс выполнения

1. DAG Parser (парсит python файлы с DAG-ами)
   ↓
2. Scheduler (проверяет каждые N секунд)
   └─→ Нужно ли запустить какую-то задачу?
   └─→ Да? Создаёт task_instance и кладёт в очередь
   ↓
3. Executor (читает из очереди)
   └─→ Запускает task в worker-е (локально или удалённо)
   ↓
4. Task выполняется
   └─→ Сохраняет результат в MetaDB
   └─→ Сохраняет логи
   ↓
5. Web UI отображает результаты

Архитектура с Celery

┌─────────────────┐
│   Scheduler     │ Создаёт task instances
└────────┬────────┘
         │ Отправляет задачи
         ↓
┌─────────────────────────┐
│   Message Broker        │ (Redis, RabbitMQ)
│   (очередь задач)      │
└────────┬────────────────┘
         │
    ┌────┼────┬─────────┐
    ↓    ↓    ↓         ↓
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Worker│ │Worker│ │Worker│ │Worker│ Выполняют задачи
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘
   │        │       │        │
   └────┬───┴───┬───┴────┬───┘
        ↓       ↓        ↓
   ┌──────────────────────────┐
   │     Metadata DB          │ Сохраняет результаты
   │   (PostgreSQL)           │
   └──────────────────────────┘

Типы операторов

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# Python
PythonOperator(
    task_id='python_task',
    python_callable=my_function
)

# Bash
BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello World"'
)

# SQL
PostgresOperator(
    task_id='postgres_task',
    sql='SELECT * FROM users;',
    postgres_conn_id='postgres_default'
)

# HTTP
SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='http_default',
    endpoint='/api/users',
    method='GET'
)

# Email
EmailOperator(
    task_id='email_task',
    to='user@example.com',
    subject='Pipeline completed',
    html_content='Success!'
)

XCom (Cross Communication)

Передача данных между task-ами:

@task
def task1():
    return {'value': 42}

@task
def task2(value):
    print(f"Received: {value}")

# Airflow автоматически передаёт результат task1 в task2
task1() >> task2()

# Или явно с xcom_pull
def my_task(task_instance):
    value = task_instance.xcom_pull(task_ids='other_task')
    return value

Реальный пример: ETL pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email': ['airflow@example.com'],
    'email_on_failure': True,
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)

def extract_data():
    """Extract из внешнего API"""
    import requests
    response = requests.get('https://api.example.com/data')
    return response.json()

def transform_data(task_instance):
    """Transform данные"""
    data = task_instance.xcom_pull(task_ids='extract')
    # Очищаем, валидируем, трансформируем
    return [{'id': d['id'], 'name': d['name'].upper()} for d in data]

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag
)

load = PostgresOperator(
    task_id='load',
    sql="INSERT INTO public.data (id, name) VALUES (%(id)s, %(name)s)",
    postgres_conn_id='postgres_default',
    dag=dag
)

notify = EmailOperator(
    task_id='notify',
    to='admin@example.com',
    subject='ETL pipeline completed',
    html_content='Pipeline finished successfully!',
    dag=dag
)

extract >> transform >> load >> notify

Мониторинг и алерты

from airflow.exceptions import AirflowException
from airflow.models import Variable

dag = DAG(
    'monitored_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@hourly',
    # Если задача не завершится за 30 минут — прерыва
    dagrun_timeout=timedelta(minutes=30),
    # Максимум 3 одновременных запусков
    max_active_runs=3,
    # Ждать 5 дней перед очисткой старых runs
    sla=timedelta(days=5),
)

def check_quality(task_instance):
    """Проверка качества данных"""
    data = task_instance.xcom_pull(task_ids='transform')
    if len(data) == 0:
        raise AirflowException("No data to process!")
    if len(data) < 1000:  # Warning
        Variable.set("last_warning", "Low data volume")

Резюме: Компоненты Airflow

Core Components:

  1. Scheduler — планирует и триггерит задачи
  2. Executor — запускает задачи (локально или распределённо)
  3. MetaDB — сохраняет состояние и историю
  4. Web UI — интерфейс управления
  5. DAG — граф задач (определяешь ты)

Поддерживающие компоненты:

  • Message Broker (Redis/RabbitMQ) — очередь задач
  • Workers — машины, которые выполняют задачи
  • Logging — система логирования
  • Plugins — расширения (custom операторы)

Ключевые концепции:

  • DAG = набор задач с зависимостями
  • Task = единица работы
  • Operator = тип задачи (Python, Bash, SQL и т.д.)
  • XCom = передача данных между задачами
  • SLA = Service Level Agreement (максимальное время выполнения)

Airflow — это мощный инструмент для создания production-ready data pipeline-ов на Python.