← Назад к вопросам

Какие инструменты ETL использовал (Apache Airflow, Apache Spark)?

1.2 Junior🔥 211 комментариев
#Apache Airflow и оркестрация#Apache Spark#ETL и качество данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

ETL инструменты: Apache Airflow и Apache Spark

Это два ключевых инструмента в современной data engineering архитектуре. Они часто используются вместе: Airflow для оркестрации, Spark для обработки данных.

Apache Airflow

Что это? Airflow — это платформа для оркестрации (scheduling, monitoring) data pipelines. Определяешь DAG'и (Directed Acyclic Graphs), Airflow их выполняет.

Основные компоненты:

  • Scheduler — запускает DAG'и по расписанию
  • Executor — выполняет задачи (Local, Celery, Kubernetes)
  • Web UI — мониторинг и управление
  • MetaDatabase — хранит состояние DAG'ов и tasks

Когда использовать:

  • Scheduling тяжёлых jobs (daily, hourly)
  • Зависимости между tasks
  • Мониторинг и retry logic
  • Complex data pipelines с условиями

Пример простого DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def extract_data():
    print("Extracting data...")

def transform_data():
    print("Transforming data...")

with DAG(
    'simple_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 2 * * *'  # Каждый день в 02:00
) as dag:
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data
    )
    
    load = BashOperator(
        task_id='load',
        bash_command='python /scripts/load.py'
    )
    
    extract >> transform >> load

Преимущества:

  • Powerful scheduling и retry механизм
  • Отличный мониторинг в Web UI
  • Большая экосистема операторов (Postgres, Spark, S3, etc.)
  • Backfill и re-runs
  • Dynamic DAG'и

Недостатки:

  • Только оркестрация, не обработка данных
  • Требует отдельное вычислительное ядро (Spark, Python scripts, etc.)
  • Может быть heavy для простых задач

Apache Spark

Что это? Spark — это distributed processing engine для обработки больших данных. Может работать на Hadoop, Kubernetes, облаках.

Режимы работы:

  • Batch processing — обработка всех данных сразу
  • Streaming — real-time обработка потоков
  • SQL — SQL запросы (SparkSQL)
  • Machine Learning — MLlib для ML

Когда использовать:

  • Обработка больших объёмов данных (Gb+)
  • Transformations (map, filter, join, aggregate)
  • Distributed processing
  • SQL queries на больших датасетах

Пример трансформации в Spark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum

spark = SparkSession.builder.appName("ETL").getOrCreate()

# Читаем данные
df = spark.read.parquet("/data/orders.parquet")

# Трансформируем
transformed = df \
    .filter(col("order_amount") > 100) \
    .withColumn(
        "order_category",
        when(col("order_amount") > 1000, "premium")
        .when(col("order_amount") > 500, "standard")
        .otherwise("regular")
    ) \
    .groupBy("order_category") \
    .agg(
        spark_sum("order_amount").alias("total_amount"),
        count("*").alias("order_count")
    )

# Пишем результат
transformed.write.mode("overwrite").parquet("/output/results.parquet")

Преимущества:

  • Distributed processing (масштабируемость)
  • In-memory caching (быстро)
  • Поддержка множества источников данных
  • Catalyst optimizer для оптимизации запросов
  • Lazy evaluation

Недостатки:

  • Требует кластер для эффективности
  • Curve learning (непросто на старте)
  • Overhead для маленьких объёмов данных
  • Требует управление памятью

Использование Airflow + Spark вместе

Типичная архитектура:

Airflow (оркестрация) → SparkSubmitOperator → Spark Cluster (обработка)

Пример DAG с Spark job'ом:

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

with DAG(
    'spark_etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
) as dag:
    # Spark job
    spark_job = SparkSubmitOperator(
        task_id='run_spark_etl',
        application='/path/to/etl_script.py',
        conf={
            'spark.executor.memory': '4g',
            'spark.executor.cores': 4
        },
        spark_binary='/usr/bin/spark-submit'
    )
    
    spark_job

ETL скрипт (etl_script.py):

from pyspark.sql import SparkSession
import sys

spark = SparkSession.builder.appName("ETL").getOrCreate()

# Extract
df = spark.read.parquet(sys.argv[1] if len(sys.argv) > 1 else "/data/input")

# Transform
df_transformed = df \
    .filter(df.date >= "2024-01-01") \
    .groupBy("user_id") \
    .agg({"amount": "sum"})

# Load
df_transformed.write.mode("overwrite").parquet("/data/output")

print("ETL completed successfully")

Сравнение с другими ETL инструментами

ИнструментОркестрацияОбработкаПростотаCloud Native
AirflowОтличнаяНетСредняяХорошо
SparkНетОтличнаяСложнаяХорошо
dbtХорошаяSQL onlyПростаяОтличная
LuigiХорошаяНетПростаяПлохо
PrefectОтличнаяНетХорошоОтличная
TalendОбеОбеСложнаяХорошо
Apache NifiХорошаяБазоваяСредняяХорошо

Когда выбрать что

Используй Airflow, если:

  • Сложная логика scheduling'а
  • Нужен powerful retry mechanism
  • Требуется мониторинг и alerting
  • Dynamic DAG'и

Используй Spark, если:

  • Большие объёмы данных (100+ Gb)
  • Нужна distributed обработка
  • Complex трансформации
  • SQL queries на big data

Используй оба вместе, если:

  • Production data pipeline
  • Нужна и оркестрация, и обработка
  • Enterprise требования

Alternative подходы

dbt (Data Build Tool)

# dbt.yml
models:
  - name: user_metrics
    description: User aggregated metrics
    materialized: table
    sql:
      SELECT
        user_id,
        COUNT(*) as orders,
        SUM(amount) as total_spent
      FROM orders
      GROUP BY user_id

Prefect (современная альтернатива Airflow)

from prefect import flow, task

@task
def extract():
    return pd.read_csv('/data/input.csv')

@task
def transform(data):
    return data[data['amount'] > 100]

@task
def load(data):
    data.to_parquet('/data/output.parquet')

@flow
def etl_pipeline():
    data = extract()
    transformed = transform(data)
    load(transformed)

if __name__ == "__main__":
    etl_pipeline()

Best Practices

  1. Разделяй оркестрацию и обработку

    • Airflow для scheduling
    • Spark для трансформаций
  2. Используй idempotent операции

    • Переписывай полностью, не append'ь
    • Позволяет re-runs без побочных эффектов
  3. Мониторь все вещи

    • Airflow logs
    • Spark metrics
    • Data quality checks
  4. Версионируй код

    • Docker образы для Spark jobs
    • Git для DAG'ов
  5. Обработай ошибки

    • Retry logic
    • Dead letter queues
    • Alerts на failures

Выводы

Airflow и Spark — complementary инструменты. Airflow управляет когда и как запускать jobs, Spark их обрабатывает. Для production data pipeline'ов эта комбинация — практически стандарт в industry. Выбор других инструментов зависит от конкретных требований и масштаба проекта.

Какие инструменты ETL использовал (Apache Airflow, Apache Spark)? | PrepBro