← Назад к вопросам

Как работал с airflow в kubernetes?

2.0 Middle🔥 141 комментариев
#Apache Airflow и оркестрация#Облачные платформы

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Apache Airflow в Kubernetes

Деплой Apache Airflow в Kubernetes даёт масштабируемость, надёжность и удобство управления. KubernetesExecutor — предпочтительный способ запуска Airflow в K8s.

Архитектура Airflow в Kubernetes

Компоненты:

  • Scheduler: планирует DAGи, работает в подах
  • Webserver: веб-интерфейс Airflow
  • KubernetesExecutor: создаёт под для каждой задачи
  • PostgreSQL/MySQL: метаданные и состояние
  • Redis (опционально): для кэша и сообщений
# Базовая конфигурация Airflow в K8s
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
data:
  AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:password@postgres:5432/airflow
  AIRFLOW__KUBERNETES__NAMESPACE: airflow
  AIRFLOW__KUBERNETES__DAGS_IN_IMAGE: "True"

Установка через Helm Chart

Официальный Helm chart для Airflow:

# Добавить repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update

# Установить с кастомными значениями
helm install airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  --values values.yaml

values.yaml конфиг:

# Репликация компонентов
replicas:
  scheduler: 2      # HA для scheduler
  webserver: 2      # Несколько вебсерверов

# KubernetesExecutor
executor: KubernetesExecutor

# Ресурсы
resources:
  scheduler:
    requests:
      cpu: 1
      memory: "2Gi"
  webserver:
    requests:
      cpu: 0.5
      memory: "1Gi"

# DAGs загрузка
dags:
  gitSync:
    enabled: true
    repo: https://github.com/user/dags.git
    branch: main
    subPath: "airflow/dags"
    syncWait: 60

# PostgreSQL для метаданных
postgresql:
  enabled: true
  auth:
    username: airflow
    password: airflow_password
    database: airflow

KubernetesExecutor конфигурация

Как работает KubernetesExecutor:

  1. Scheduler получает задачу из DAG
  2. KubernetesExecutor создаёт K8s Pod для этой задачи
  3. Pod выполняется с заданными ресурсами и логирует
  4. После завершения под удаляется
  5. Результат сохраняется в БД
# airflow_settings.py - конфигурация KubernetesExecutor
from kubernetes.client import models as k8s

# Дефолтная конфигурация подов
default_pod_kwargs = {
    "namespace": "airflow",
    "image_pull_policy": "IfNotPresent",
}

# Лимиты ресурсов по умолчанию
default_task_resources = {
    "request_cpu": "100m",
    "request_memory": "256Mi",
    "limit_cpu": "500m",
    "limit_memory": "512Mi",
}

DAG с custom pod спецификацией

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

with DAG(
    dag_id="data_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule_interval="0 2 * * *",  # 2 AM каждый день
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
) as dag:
    
    # Простая задача
    spark_job = KubernetesPodOperator(
        task_id="spark_job",
        image="my-spark:latest",
        cmds=["spark-submit"],
        arguments=["--conf", "spark.executors=10", "/scripts/job.py"],
        namespace="airflow",
        in_cluster=True,
        get_logs=True,
        resources={
            "request": {
                "cpu": "1",
                "memory": "2Gi",
            },
            "limit": {
                "cpu": "2",
                "memory": "4Gi",
            }
        },
        # Переменные окружения
        env_vars={
            "ENVIRONMENT": "production",
            "BATCH_SIZE": "1000",
        },
        # Volumes
        volumes=[
            k8s.V1Volume(
                name="shared-data",
                empty_dir=k8s.V1EmptyDirVolumeSource(),
            )
        ],
        volume_mounts=[
            k8s.V1VolumeMount(
                name="shared-data",
                mount_path="/data",
            )
        ],
        # Nodeselector для специализированных узлов
        node_selector={"compute": "high"},
    )
    
    # Python задача
    python_task = KubernetesPodOperator(
        task_id="process_data",
        image="python:3.10",
        cmds=["python"],
        arguments=["-c", "print('Processing data')"],
        namespace="airflow",
    )
    
    spark_job >> python_task

Масштабирование и HA

High Availability Scheduler:

# 2+ scheduler для HA
schedulers:
  replicas: 2
  affinity:
    podAntiAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - labelSelector:
            matchExpressions:
              - key: component
                operator: In
                values:
                  - scheduler
          topologyKey: kubernetes.io/hostname

Autoscaling для worker nodes:

# HorizontalPodAutoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: airflow-scheduler-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: airflow-scheduler
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

Мониторинг в Kubernetes

# prometheus_metrics.py - сбор метрик
from prometheus_client import Counter, Histogram, start_http_server
import time

# Метрики
task_duration = Histogram(
    'airflow_task_duration_seconds',
    'Task duration',
    ['dag_id', 'task_id']
)

task_failures = Counter(
    'airflow_task_failures_total',
    'Task failures',
    ['dag_id', 'task_id']
)

def measure_task_execution(dag_id, task_id):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = func(*args, **kwargs)
                return result
            except Exception as e:
                task_failures.labels(dag_id, task_id).inc()
                raise
            finally:
                duration = time.time() - start
                task_duration.labels(dag_id, task_id).observe(duration)
        return wrapper
    return decorator

ServiceMonitor для Prometheus:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: airflow
spec:
  selector:
    matchLabels:
      app: airflow
  endpoints:
    - port: flower
      interval: 30s

Логирование

# Логирование в S3 или GCS
AIRFLOW__LOGGING__REMOTE_LOGGING = True
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID = "s3_logs"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER = "s3://my-bucket/airflow-logs"

# Или с ELK Stack
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID = "elasticsearch"

Лучшие практики

  1. Используй KubernetesExecutor для изоляции — каждая задача в отдельном поде
  2. Определи resources для каждого DAG — CPU/memory limits предотвращают крах
  3. Используй gitSync для DAGs — автоматическое обновление из Git
  4. Настрой HA для scheduler — несколько реплик для надёжности
  5. Мониторь метрики — Prometheus + Grafana
  6. Логируй в централизованное хранилище — S3, GCS или ELK
  7. Используй Secrets для credentials — K8s Secrets, не env переменные
  8. Регулярно чистись базу — удаляй старые логи и метаданные
# Очистка старых логов
airflow db clean --skip-archive --dry-run
airflow db clean --skip-archive

# Переиндексация БД
airflow db upgrade

Kubernetes + Airflow дают мощную платформу для управления сложными data pipeline'ами в scale.

Как работал с airflow в kubernetes? | PrepBro