← Назад к вопросам
Из чего состоит Airflow
1.7 Middle🔥 201 комментариев
#Django#Асинхронность и многопоточность#Базы данных (NoSQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Apache Airflow: Архитектура и компоненты
Apache Airflow — это платформа для оркестрации workflow-ов, написанная на Python. Она позволяет определять, планировать и мониторить сложные data pipelines.
Основные компоненты Airflow
1. Scheduler (Планировщик)
Отвечает за:
- Мониторинг DAG-ов (Directed Acyclic Graphs)
- Создание task instances согласно расписанию
- Определение готовности задач к выполнению
- Отправка готовых задач в очередь (queue)
# Scheduler запускает каждые N секунд
# и проверяет, какие задачи нужно запустить
while True:
for dag in dags:
for task in dag.tasks:
if task.should_run(now):
queue.put(task) # Отправляет в очередь
sleep(5) # Проверяет каждые 5 секунд
2. Executor (Исполнитель)
Запускает task instances. Есть несколько типов:
- SequentialExecutor — запускает одну задачу за раз (для разработки)
- LocalExecutor — запускает параллельно в одной машине
- CeleryExecutor — запускает на распределённых worker'ах
- KubernetesExecutor — запускает в K8s подах
class Executor:
def execute_task(self, task):
# Запустить task
result = task.run()
return result
# LocalExecutor
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(task.run)
3. MetaDB (Metadata Database)
Сохраняет:
- Историю выполнения задач
- Состояние DAG-ов и task instances
- Переменные и connection-ы
- Логи и результаты
# Пример в PostgreSQL
CREATE TABLE task_instance (
dag_id VARCHAR(250),
task_id VARCHAR(250),
execution_date TIMESTAMP,
state VARCHAR(20), -- queued, running, success, failed
start_date TIMESTAMP,
end_date TIMESTAMP,
duration FLOAT,
PRIMARY KEY (dag_id, task_id, execution_date)
);
4. Web UI (User Interface)
Веб-интерфейс для:
- Просмотра DAG-ов
- Мониторинга выполнения
- Управления переменными и connection-ами
- Проверки логов
- Ручного запуска задач
http://localhost:8080
├── DAGs (список всех pipeline-ов)
├── Graph View (визуализация зависимостей)
├── Tree View (история выполнения)
├── Log (логи каждой задачи)
├── Admin (переменные, connection-ы, пользователи)
└── Security (роли, permissions)
Как работает Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 1. Определяю DAG
dag = DAG(
'my_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily', # Каждый день
catchup=False
)
# 2. Определяю задачи
def fetch_data():
print("Fetching data...")
return "data"
def process_data(task_instance):
data = task_instance.xcom_pull(task_ids='fetch')
print(f"Processing: {data}")
def save_results():
print("Saving results...")
# 3. Создаю task-и
fetch_task = PythonOperator(
task_id='fetch',
python_callable=fetch_data,
dag=dag
)
process_task = PythonOperator(
task_id='process',
python_callable=process_data,
dag=dag
)
save_task = PythonOperator(
task_id='save',
python_callable=save_results,
dag=dag
)
# 4. Определяю зависимости
fetch_task >> process_task >> save_task
# Scheduler запустит это в таком порядке:
# 1. fetch_task (параллельно может быть несколько)
# 2. process_task (когда fetch завершена)
# 3. save_task (когда process завершена)
Процесс выполнения
1. DAG Parser (парсит python файлы с DAG-ами)
↓
2. Scheduler (проверяет каждые N секунд)
└─→ Нужно ли запустить какую-то задачу?
└─→ Да? Создаёт task_instance и кладёт в очередь
↓
3. Executor (читает из очереди)
└─→ Запускает task в worker-е (локально или удалённо)
↓
4. Task выполняется
└─→ Сохраняет результат в MetaDB
└─→ Сохраняет логи
↓
5. Web UI отображает результаты
Архитектура с Celery
┌─────────────────┐
│ Scheduler │ Создаёт task instances
└────────┬────────┘
│ Отправляет задачи
↓
┌─────────────────────────┐
│ Message Broker │ (Redis, RabbitMQ)
│ (очередь задач) │
└────────┬────────────────┘
│
┌────┼────┬─────────┐
↓ ↓ ↓ ↓
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Worker│ │Worker│ │Worker│ │Worker│ Выполняют задачи
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘
│ │ │ │
└────┬───┴───┬───┴────┬───┘
↓ ↓ ↓
┌──────────────────────────┐
│ Metadata DB │ Сохраняет результаты
│ (PostgreSQL) │
└──────────────────────────┘
Типы операторов
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# Python
PythonOperator(
task_id='python_task',
python_callable=my_function
)
# Bash
BashOperator(
task_id='bash_task',
bash_command='echo "Hello World"'
)
# SQL
PostgresOperator(
task_id='postgres_task',
sql='SELECT * FROM users;',
postgres_conn_id='postgres_default'
)
# HTTP
SimpleHttpOperator(
task_id='http_task',
http_conn_id='http_default',
endpoint='/api/users',
method='GET'
)
# Email
EmailOperator(
task_id='email_task',
to='user@example.com',
subject='Pipeline completed',
html_content='Success!'
)
XCom (Cross Communication)
Передача данных между task-ами:
@task
def task1():
return {'value': 42}
@task
def task2(value):
print(f"Received: {value}")
# Airflow автоматически передаёт результат task1 в task2
task1() >> task2()
# Или явно с xcom_pull
def my_task(task_instance):
value = task_instance.xcom_pull(task_ids='other_task')
return value
Реальный пример: ETL pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email': ['airflow@example.com'],
'email_on_failure': True,
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
def extract_data():
"""Extract из внешнего API"""
import requests
response = requests.get('https://api.example.com/data')
return response.json()
def transform_data(task_instance):
"""Transform данные"""
data = task_instance.xcom_pull(task_ids='extract')
# Очищаем, валидируем, трансформируем
return [{'id': d['id'], 'name': d['name'].upper()} for d in data]
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag
)
load = PostgresOperator(
task_id='load',
sql="INSERT INTO public.data (id, name) VALUES (%(id)s, %(name)s)",
postgres_conn_id='postgres_default',
dag=dag
)
notify = EmailOperator(
task_id='notify',
to='admin@example.com',
subject='ETL pipeline completed',
html_content='Pipeline finished successfully!',
dag=dag
)
extract >> transform >> load >> notify
Мониторинг и алерты
from airflow.exceptions import AirflowException
from airflow.models import Variable
dag = DAG(
'monitored_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@hourly',
# Если задача не завершится за 30 минут — прерыва
dagrun_timeout=timedelta(minutes=30),
# Максимум 3 одновременных запусков
max_active_runs=3,
# Ждать 5 дней перед очисткой старых runs
sla=timedelta(days=5),
)
def check_quality(task_instance):
"""Проверка качества данных"""
data = task_instance.xcom_pull(task_ids='transform')
if len(data) == 0:
raise AirflowException("No data to process!")
if len(data) < 1000: # Warning
Variable.set("last_warning", "Low data volume")
Резюме: Компоненты Airflow
Core Components:
- Scheduler — планирует и триггерит задачи
- Executor — запускает задачи (локально или распределённо)
- MetaDB — сохраняет состояние и историю
- Web UI — интерфейс управления
- DAG — граф задач (определяешь ты)
Поддерживающие компоненты:
- Message Broker (Redis/RabbitMQ) — очередь задач
- Workers — машины, которые выполняют задачи
- Logging — система логирования
- Plugins — расширения (custom операторы)
Ключевые концепции:
- DAG = набор задач с зависимостями
- Task = единица работы
- Operator = тип задачи (Python, Bash, SQL и т.д.)
- XCom = передача данных между задачами
- SLA = Service Level Agreement (максимальное время выполнения)
Airflow — это мощный инструмент для создания production-ready data pipeline-ов на Python.