← Назад к вопросам

Что такое pool Airflow?

1.8 Middle🔥 261 комментариев
#Тестирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Pool в Apache Airflow: Управление ресурсами и параллелизмом

Pool (пул) в Apache Airflow — это механизм для управления и ограничения параллельного выполнения задач на определённых ресурсах. Pool определяет максимальное количество одновременно работающих задач в группе, что позволяет избежать перегрузки ресурсов и контролировать использование внешних сервисов.

Концепция

Без Pool — проблема:

Когда запускаются 100 DAG'ов одновременно:
- Каждый DAG имеет 10 задач
- Всего 1000 задач хотят запуститься одновременно
- БД не выдерживает нагрузку → все падает 🔥

С Pool — решение:

default_pool: max_slots = 128
db_pool: max_slots = 10
api_pool: max_slots = 5

Каждой задаче присваиваем pool + pool_slots (сколько слотов занимает)
Airflow scheduler уважает лимиты и не допускает перегрузку

Как это работает

Airflow Scheduler (планировщик)
├── Смотрит на очередь задач
├── Проверяет, есть ли свободные слоты в pool'е
├── Если есть свободные слоты → отправляет задачу на исполнение
└── Если нет → задача ждет в очереди

Основные концепции:

1. max_slots — максимальное количество работающих задач одновременно
   default_pool: 128 слотов (может быть 128 одновременных задач)

2. pool_slots (или pool) — сколько слотов использует одна задача
   Обычно 1 слот на задачу, но может быть больше для тяжелых работ

3. queue — очередь, в которой задачи ждут свободных слотов

Создание и использование Pool'ов

Вариант 1: Через UI (web-интерфейс)

Admin → Pools → Create

Pool name: database_pool
Slots: 10
Description: "Лимит для подключений к БД (max 10 одновременно)"

Вариант 2: Через Python API

from airflow.models import Pool
from airflow.utils.db import create_default_conn

# Создать pool
db_pool = Pool(
    pool='database_pool',
    slots=10,
    description='Max 10 concurrent database connections'
)

# Или получить существующий
pool = Pool.get_pool('database_pool')
pool.slots = 15  # Изменить
session.add(pool)
session.commit()

Вариант 3: Через конфиг (airflow.cfg)

[core]
max_active_tasks_per_dag = 16  # По DAG'у
max_active_runs_per_dag = 16   # Одновременных запусков DAG'а

Использование Pool в DAG'е

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def connect_to_database():
    """Подключение к БД — дорогой ресурс"""
    print("Connecting to database...")
    return "Connected"

with DAG(
    dag_id='database_operations',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
) as dag:
    # Эта задача занимает 1 слот в database_pool
    task1 = PythonOperator(
        task_id='query_database_1',
        python_callable=connect_to_database,
        pool='database_pool',  # Присваиваем pool
        pool_slots=1           # Занимает 1 слот
    )
    
    task2 = PythonOperator(
        task_id='query_database_2',
        python_callable=connect_to_database,
        pool='database_pool',
        pool_slots=1
    )
    
    # Эта задача может быть тяжелой и занимает несколько слотов
    task3 = PythonOperator(
        task_id='heavy_computation',
        python_callable=lambda: print("Computing..."),
        pool='default_pool',
        pool_slots=4  # Занимает 4 слота!
    )
    
    task1 >> task2 >> task3

# В этом DAG'е:
# - task1 и task2 могут запуститься одновременно (если есть 2 слота в database_pool)
# - task3 займет 4 слота из 128 в default_pool

Пример с реальной БД

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime

# Создаем pool для PostgreSQL
# Admin -> Pools -> Create
# postgres_pool: 20 слотов (макс 20 одновременных подключений)

with DAG(
    dag_id='database_batch_processing',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@hourly'
) as dag:
    
    # 100 задач по обработке таблиц
    tasks = []
    for i in range(100):
        task = PostgresOperator(
            task_id=f'process_table_{i}',
            sql=f'SELECT COUNT(*) FROM table_{i}',
            postgres_conn_id='postgres_default',
            pool='postgres_pool',  # Все используют одни и тот же pool
            pool_slots=1
        )
        tasks.append(task)
    
    # Линейные зависимости
    for i in range(1, len(tasks)):
        tasks[i-1] >> tasks[i]

# Результат:
# Вместо 100 одновременных подключений (которые сломали бы БД),
# Airflow будет держать максимум 20 одновременно
# Остальные 80 задач будут ждать в очереди

Priority Pool Slots

Можно задать приоритет для задач

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('priority_example', start_date=datetime(2024, 1, 1)) as dag:
    
    # Низкий приоритет (выполняется последним)
    low_priority = PythonOperator(
        task_id='low_priority_task',
        python_callable=lambda: print("Low priority"),
        pool='shared_pool',
        priority_weight=1  # Меньше = меньше приоритет
    )
    
    # Высокий приоритет (выполняется первым)
    high_priority = PythonOperator(
        task_id='high_priority_task',
        python_callable=lambda: print("High priority"),
        pool='shared_pool',
        priority_weight=100  # Больше = больше приоритет
    )

Мониторинг Pool'ов

# Получить информацию о pool'е
from airflow.models import Pool

pool = Pool.get_pool('database_pool')
print(f"Pool: {pool.pool}")
print(f"Max slots: {pool.slots}")
print(f"Used slots: {pool.used_slots}")  # Сколько занято прямо сейчас
print(f"Available slots: {pool.slots - pool.used_slots}")

# В UI можно видеть в Home -> Pools
# или через CLI
# airflow pools list
# airflow pools get database_pool

Pool vs Queue vs DAG Parallelism

Концепция          Что контролирует         Пример
─────────────────────────────────────────────────────
Pool              Слоты для задач         max 10 БД запросов
Queue             Какой worker берет      default, high_memory
                  задачу из очереди
max_active_tasks  Макс задач на worker    32 задачи на worker
per_dag                                     одновременно
max_active_runs   Макс запусков DAG'а    макс 2 запуска DAG'а
per_dag           одновременно             одновременно

Распространенные pool'ы

# default_pool — встроенный, всегда есть
# По умолчанию 128 слотов

# Хорошие примеры для создания:

# 1. БД-специфичные pool'ы
postgres_pool: 10     # Макс 10 подключений
mysql_pool: 15
mongo_pool: 20

# 2. API-специфичные pool'ы
api_external: 5       # Лимит 5 req/sec на внешний API
slack_notifications: 3 # Не спамить Slack

# 3. Ресурс-специфичные pool'ы
high_memory_pool: 4   # Тяжелые вычисления
short_running: 64     # Много быстрых задач

Pool Priority Queue

Есть расширенная функция Pool Priority Queue (PPQ):

# С PPQ в конфиге:
[core]
pool_priority_queue = True  # Включить приоритет

# Теперь scheduler учитывает priority_weight
with DAG('pq_example', start_date=datetime(2024, 1, 1)) as dag:
    important = PythonOperator(
        task_id='important',
        python_callable=lambda: None,
        pool='default_pool',
        priority_weight=1000  # Будет выполнена первой
    )
    
    regular = PythonOperator(
        task_id='regular',
        python_callable=lambda: None,
        pool='default_pool',
        priority_weight=10
    )

Лучшие практики

# 1. Создавай pool'ы для дорогих ресурсов
для БД, API, обучения моделей → отдельный pool

# 2. Установи реалистичные лимиты
db_connections = 10  # max_connections в БД / 2
api_requests = 100 / 60  # rate limit API

# 3. Мониторь использование
Регулярно проверяй Pool → Slots используется
Если всегда 100% → увеличь slots
Если редко >50% → уменьши slots

# 4. Документируй pool'ы
Каждому pool'у — описание в Description
В DAG коде — комментарий про pool

# 5. Тестируй с реальной нагрузкой
Запусти 10 DAG'ов × 10 задач одновременно
Проверь, что не ломается

Резюме

Pool в Airflow — это механизм ограничения одновременного выполнения задач по ресурсам. Каждая задача может быть назначена pool'у с определённым количеством слотов. Scheduler уважает эти лимиты и предотвращает перегрузку БД, API и других сервисов. Pool'ы критичны для стабильной работы production Airflow инстансов с большой нагрузкой.