← Назад к вопросам
Как работал с airflow в kubernetes?
2.0 Middle🔥 141 комментариев
#Apache Airflow и оркестрация#Облачные платформы
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Apache Airflow в Kubernetes
Деплой Apache Airflow в Kubernetes даёт масштабируемость, надёжность и удобство управления. KubernetesExecutor — предпочтительный способ запуска Airflow в K8s.
Архитектура Airflow в Kubernetes
Компоненты:
- Scheduler: планирует DAGи, работает в подах
- Webserver: веб-интерфейс Airflow
- KubernetesExecutor: создаёт под для каждой задачи
- PostgreSQL/MySQL: метаданные и состояние
- Redis (опционально): для кэша и сообщений
# Базовая конфигурация Airflow в K8s
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
data:
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:password@postgres:5432/airflow
AIRFLOW__KUBERNETES__NAMESPACE: airflow
AIRFLOW__KUBERNETES__DAGS_IN_IMAGE: "True"
Установка через Helm Chart
Официальный Helm chart для Airflow:
# Добавить repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Установить с кастомными значениями
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--values values.yaml
values.yaml конфиг:
# Репликация компонентов
replicas:
scheduler: 2 # HA для scheduler
webserver: 2 # Несколько вебсерверов
# KubernetesExecutor
executor: KubernetesExecutor
# Ресурсы
resources:
scheduler:
requests:
cpu: 1
memory: "2Gi"
webserver:
requests:
cpu: 0.5
memory: "1Gi"
# DAGs загрузка
dags:
gitSync:
enabled: true
repo: https://github.com/user/dags.git
branch: main
subPath: "airflow/dags"
syncWait: 60
# PostgreSQL для метаданных
postgresql:
enabled: true
auth:
username: airflow
password: airflow_password
database: airflow
KubernetesExecutor конфигурация
Как работает KubernetesExecutor:
- Scheduler получает задачу из DAG
- KubernetesExecutor создаёт K8s Pod для этой задачи
- Pod выполняется с заданными ресурсами и логирует
- После завершения под удаляется
- Результат сохраняется в БД
# airflow_settings.py - конфигурация KubernetesExecutor
from kubernetes.client import models as k8s
# Дефолтная конфигурация подов
default_pod_kwargs = {
"namespace": "airflow",
"image_pull_policy": "IfNotPresent",
}
# Лимиты ресурсов по умолчанию
default_task_resources = {
"request_cpu": "100m",
"request_memory": "256Mi",
"limit_cpu": "500m",
"limit_memory": "512Mi",
}
DAG с custom pod спецификацией
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
with DAG(
dag_id="data_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="0 2 * * *", # 2 AM каждый день
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
) as dag:
# Простая задача
spark_job = KubernetesPodOperator(
task_id="spark_job",
image="my-spark:latest",
cmds=["spark-submit"],
arguments=["--conf", "spark.executors=10", "/scripts/job.py"],
namespace="airflow",
in_cluster=True,
get_logs=True,
resources={
"request": {
"cpu": "1",
"memory": "2Gi",
},
"limit": {
"cpu": "2",
"memory": "4Gi",
}
},
# Переменные окружения
env_vars={
"ENVIRONMENT": "production",
"BATCH_SIZE": "1000",
},
# Volumes
volumes=[
k8s.V1Volume(
name="shared-data",
empty_dir=k8s.V1EmptyDirVolumeSource(),
)
],
volume_mounts=[
k8s.V1VolumeMount(
name="shared-data",
mount_path="/data",
)
],
# Nodeselector для специализированных узлов
node_selector={"compute": "high"},
)
# Python задача
python_task = KubernetesPodOperator(
task_id="process_data",
image="python:3.10",
cmds=["python"],
arguments=["-c", "print('Processing data')"],
namespace="airflow",
)
spark_job >> python_task
Масштабирование и HA
High Availability Scheduler:
# 2+ scheduler для HA
schedulers:
replicas: 2
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: component
operator: In
values:
- scheduler
topologyKey: kubernetes.io/hostname
Autoscaling для worker nodes:
# HorizontalPodAutoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: airflow-scheduler-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: airflow-scheduler
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Мониторинг в Kubernetes
# prometheus_metrics.py - сбор метрик
from prometheus_client import Counter, Histogram, start_http_server
import time
# Метрики
task_duration = Histogram(
'airflow_task_duration_seconds',
'Task duration',
['dag_id', 'task_id']
)
task_failures = Counter(
'airflow_task_failures_total',
'Task failures',
['dag_id', 'task_id']
)
def measure_task_execution(dag_id, task_id):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
return result
except Exception as e:
task_failures.labels(dag_id, task_id).inc()
raise
finally:
duration = time.time() - start
task_duration.labels(dag_id, task_id).observe(duration)
return wrapper
return decorator
ServiceMonitor для Prometheus:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: airflow
spec:
selector:
matchLabels:
app: airflow
endpoints:
- port: flower
interval: 30s
Логирование
# Логирование в S3 или GCS
AIRFLOW__LOGGING__REMOTE_LOGGING = True
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID = "s3_logs"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER = "s3://my-bucket/airflow-logs"
# Или с ELK Stack
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID = "elasticsearch"
Лучшие практики
- Используй KubernetesExecutor для изоляции — каждая задача в отдельном поде
- Определи resources для каждого DAG — CPU/memory limits предотвращают крах
- Используй gitSync для DAGs — автоматическое обновление из Git
- Настрой HA для scheduler — несколько реплик для надёжности
- Мониторь метрики — Prometheus + Grafana
- Логируй в централизованное хранилище — S3, GCS или ELK
- Используй Secrets для credentials — K8s Secrets, не env переменные
- Регулярно чистись базу — удаляй старые логи и метаданные
# Очистка старых логов
airflow db clean --skip-archive --dry-run
airflow db clean --skip-archive
# Переиндексация БД
airflow db upgrade
Kubernetes + Airflow дают мощную платформу для управления сложными data pipeline'ами в scale.