← Назад к вопросам

Что такое connections Airflow?

1.3 Junior🔥 111 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Connections в Apache Airflow

Connections в Airflow — это централизованное хранилище учётных данных и параметров подключения к внешним системам (базы данных, API, облачные сервисы, серверы и т.д.). Вместо того чтобы хардкодить учётные данные в DAG'ах, Airflow предоставляет безопасный способ хранения и управления этой информацией.

Основная структура Connection

Каждое connection состоит из:

  • Connection ID — уникальный идентификатор для ссылки из DAG
  • Connection Type — тип подключения (postgres, mysql, http, s3, gcp, spark и т.д.)
  • Host — адрес сервера
  • Port — порт
  • Login — имя пользователя
  • Password — пароль (зашифрованный)
  • Schema — имя базы данных
  • Extra — дополнительные параметры в JSON формате

Способы создания Connections

1. Через веб-интерфейс Airflow:

В UI Airflow отрите меню Admin → Connections и нажмите "Create a new record". Заполните поля для вашей базы данных или сервиса.

2. Через Airflow CLI:

# Создание connection
airflow connections add \
  --conn-id my-postgres \
  --conn-type postgres \
  --conn-host localhost \
  --conn-login myuser \
  --conn-password mypassword \
  --conn-schema mydb \
  --conn-port 5432

# Просмотр всех connections
airflow connections list

# Удаление connection
airflow connections delete my-postgres

3. Через переменные окружения:

Airflow автоматически создаёт connection из переменных окружения формата AIRFLOW_CONN_*:

export AIRFLOW_CONN_MY_POSTGRES="postgresql://myuser:mypassword@localhost:5432/mydb"

4. Программно в Python:

from airflow import settings
from airflow.models import Connection
from sqlalchemy.orm import Session

# Создание connection
conn = Connection(
    conn_id='my-postgres',
    conn_type='postgres',
    host='localhost',
    login='myuser',
    password='mypassword',
    schema='mydb',
    port=5432
)

# Сохранение в БД
session = Session()
session.add(conn)
session.commit()

Использование Connections в DAG

С PostgresOperator:

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

dag = DAG('postgres_example', start_date=datetime(2024, 1, 1))

task = PostgresOperator(
    task_id='run_query',
    postgres_conn_id='my-postgres',
    sql='SELECT * FROM users;',
    dag=dag
)

С PythonOperator и hooks:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

def fetch_data_from_postgres(**context):
    # Получаем hook используя connection_id
    hook = PostgresHook(postgres_conn_id='my-postgres')
    
    # Получаем подключение
    conn = hook.get_conn()
    cursor = conn.cursor()
    
    # Выполняем запрос
    cursor.execute('SELECT COUNT(*) FROM users;')
    result = cursor.fetchone()
    print(f"Total users: {result[0]}")
    
    cursor.close()
    conn.close()

dag = DAG('postgres_hook_example', start_date=datetime(2024, 1, 1))

task = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data_from_postgres,
    dag=dag
)

Пример с HTTP API

from airflow.operators.python import PythonOperator
from airflow.providers.http.hooks.http import HttpHook

def call_api(**context):
    # HTTP connection с базовой аутентификацией
    hook = HttpHook(http_conn_id='my-api', method='GET')
    
    # Выполняем запрос
    response = hook.run(endpoint='/api/users')
    print(f"Response: {response}")

task = PythonOperator(
    task_id='call_api',
    python_callable=call_api
)

Настройка HTTP connection с Extra параметрами:

{
  "auth_type": "basic",
  "headers": {
    "Content-Type": "application/json",
    "Authorization": "Bearer token123"
  }
}

Пример с AWS S3

from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow import DAG

dag = DAG('s3_upload', start_date=datetime(2024, 1, 1))

task = LocalFilesystemToS3Operator(
    task_id='upload_file',
    filename='/tmp/myfile.csv',
    dest_s3_key='uploads/myfile.csv',
    dest_s3_bucket='my-bucket',
    aws_conn_id='aws-s3',  # AWS connection
    dag=dag
)

Работа с Extra параметрами

Для сложных конфигураций используется поле Extra в JSON формате:

conn = Connection(
    conn_id='spark-cluster',
    conn_type='spark',
    host='spark-master',
    port=7077,
    extra=json.dumps({
        'deploy_mode': 'cluster',
        'driver_memory': '4g',
        'executor_cores': '2',
        'executor_memory': '8g'
    })
)
from airflow.models import Connection
import json

def get_spark_config():
    hook = hook_class(conn_id='spark-cluster')
    conn = hook.get_connection('spark-cluster')
    extra = json.loads(conn.extra)
    print(extra['driver_memory'])  # "4g"

Безопасность

  1. Шифрование — пароли хранятся в зашифрованном виде в БД Airflow
  2. Управление доступом — только администраторы могут управлять connections через UI
  3. Не логируйте credentials — Airflow автоматически скрывает пароли в логах
  4. Используйте secrets backend — для хранения в внешних системах (Vault, AWS Secrets Manager):
# airflow.cfg
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"url": "http://vault:8200", "mount_point": "secret"}

Лучшие практики

  1. Используйте descriptive IDsmy_postgres_prod, а не conn1
  2. Не хардкодьте credentials в коде
  3. Используйте Different connections для разных окружений
  4. Документируйте required connections в DAG
  5. Регулярно ротируйте пароли

Connections в Airflow — это правильный способ управления учётными данными в production окружении, обеспечивая безопасность и масштабируемость.