Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Connections в Apache Airflow
Connections в Airflow — это централизованное хранилище учётных данных и параметров подключения к внешним системам (базы данных, API, облачные сервисы, серверы и т.д.). Вместо того чтобы хардкодить учётные данные в DAG'ах, Airflow предоставляет безопасный способ хранения и управления этой информацией.
Основная структура Connection
Каждое connection состоит из:
- Connection ID — уникальный идентификатор для ссылки из DAG
- Connection Type — тип подключения (postgres, mysql, http, s3, gcp, spark и т.д.)
- Host — адрес сервера
- Port — порт
- Login — имя пользователя
- Password — пароль (зашифрованный)
- Schema — имя базы данных
- Extra — дополнительные параметры в JSON формате
Способы создания Connections
1. Через веб-интерфейс Airflow:
В UI Airflow отрите меню Admin → Connections и нажмите "Create a new record". Заполните поля для вашей базы данных или сервиса.
2. Через Airflow CLI:
# Создание connection
airflow connections add \
--conn-id my-postgres \
--conn-type postgres \
--conn-host localhost \
--conn-login myuser \
--conn-password mypassword \
--conn-schema mydb \
--conn-port 5432
# Просмотр всех connections
airflow connections list
# Удаление connection
airflow connections delete my-postgres
3. Через переменные окружения:
Airflow автоматически создаёт connection из переменных окружения формата AIRFLOW_CONN_*:
export AIRFLOW_CONN_MY_POSTGRES="postgresql://myuser:mypassword@localhost:5432/mydb"
4. Программно в Python:
from airflow import settings
from airflow.models import Connection
from sqlalchemy.orm import Session
# Создание connection
conn = Connection(
conn_id='my-postgres',
conn_type='postgres',
host='localhost',
login='myuser',
password='mypassword',
schema='mydb',
port=5432
)
# Сохранение в БД
session = Session()
session.add(conn)
session.commit()
Использование Connections в DAG
С PostgresOperator:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
dag = DAG('postgres_example', start_date=datetime(2024, 1, 1))
task = PostgresOperator(
task_id='run_query',
postgres_conn_id='my-postgres',
sql='SELECT * FROM users;',
dag=dag
)
С PythonOperator и hooks:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
def fetch_data_from_postgres(**context):
# Получаем hook используя connection_id
hook = PostgresHook(postgres_conn_id='my-postgres')
# Получаем подключение
conn = hook.get_conn()
cursor = conn.cursor()
# Выполняем запрос
cursor.execute('SELECT COUNT(*) FROM users;')
result = cursor.fetchone()
print(f"Total users: {result[0]}")
cursor.close()
conn.close()
dag = DAG('postgres_hook_example', start_date=datetime(2024, 1, 1))
task = PythonOperator(
task_id='fetch_data',
python_callable=fetch_data_from_postgres,
dag=dag
)
Пример с HTTP API
from airflow.operators.python import PythonOperator
from airflow.providers.http.hooks.http import HttpHook
def call_api(**context):
# HTTP connection с базовой аутентификацией
hook = HttpHook(http_conn_id='my-api', method='GET')
# Выполняем запрос
response = hook.run(endpoint='/api/users')
print(f"Response: {response}")
task = PythonOperator(
task_id='call_api',
python_callable=call_api
)
Настройка HTTP connection с Extra параметрами:
{
"auth_type": "basic",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer token123"
}
}
Пример с AWS S3
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow import DAG
dag = DAG('s3_upload', start_date=datetime(2024, 1, 1))
task = LocalFilesystemToS3Operator(
task_id='upload_file',
filename='/tmp/myfile.csv',
dest_s3_key='uploads/myfile.csv',
dest_s3_bucket='my-bucket',
aws_conn_id='aws-s3', # AWS connection
dag=dag
)
Работа с Extra параметрами
Для сложных конфигураций используется поле Extra в JSON формате:
conn = Connection(
conn_id='spark-cluster',
conn_type='spark',
host='spark-master',
port=7077,
extra=json.dumps({
'deploy_mode': 'cluster',
'driver_memory': '4g',
'executor_cores': '2',
'executor_memory': '8g'
})
)
from airflow.models import Connection
import json
def get_spark_config():
hook = hook_class(conn_id='spark-cluster')
conn = hook.get_connection('spark-cluster')
extra = json.loads(conn.extra)
print(extra['driver_memory']) # "4g"
Безопасность
- Шифрование — пароли хранятся в зашифрованном виде в БД Airflow
- Управление доступом — только администраторы могут управлять connections через UI
- Не логируйте credentials — Airflow автоматически скрывает пароли в логах
- Используйте secrets backend — для хранения в внешних системах (Vault, AWS Secrets Manager):
# airflow.cfg
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"url": "http://vault:8200", "mount_point": "secret"}
Лучшие практики
- Используйте descriptive IDs —
my_postgres_prod, а неconn1 - Не хардкодьте credentials в коде
- Используйте Different connections для разных окружений
- Документируйте required connections в DAG
- Регулярно ротируйте пароли
Connections в Airflow — это правильный способ управления учётными данными в production окружении, обеспечивая безопасность и масштабируемость.