← Назад к вопросам

Приведи пример постановки задачи

1.0 Junior🔥 161 комментариев
#Архитектура и проектирование#Опыт и soft skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Пример постановки задачи для Data Engineer

Вот реальный пример постановки задачи, которую мог получить на интервью или в компании.

Контекст компании

Компания MegaShop — крупный e-commerce маркетплейс с 500K+ продавцов. Есть проблема: руководство не может быстро отслеживать финансовые показатели по каждому регионе и категории продуктов.

Задача: Построить Data Pipeline для финансовой аналитики

1. Требования (что нужно сделать)

Бизнес-требования:

  • Отслеживать выручку (revenue) и комиссию платформы по дням
  • Видеть топ-10 категорий продуктов по продажам
  • Анализировать прибыль по географическим регионам
  • Сравнивать год к году (YoY) показатели
  • Финансисты должны получать отчёт каждое утро (08:00)

Технические требования:

  • Pipeline должен обновляться ежедневно
  • Задержка данных не более 2-3 часов от момента транзакции
  • Может работать на существующей инфраструктуре (Hadoop, PostgreSQL)
  • Результаты должны быть доступны в BI-инструменте (Tableau)
  • Ретроспективные данные за последние 3 года

2. Исходные данные (что есть)

Источники данных:

-- 1. Таблица заказов (operational OLTP система)
ORDERS (
    order_id UUID,
    seller_id UUID,
    buyer_id UUID,
    order_date TIMESTAMP,
    order_total DECIMAL(10,2),
    payment_status VARCHAR(20),  -- 'paid', 'pending', 'refunded'
    delivery_status VARCHAR(20)   -- 'created', 'shipped', 'delivered'
);
-- Обновляется в реальном времени, 10 млн записей/день

-- 2. Таблица товаров
PRODUCTS (
    product_id UUID,
    product_name VARCHAR(255),
    category_id UUID,
    seller_id UUID,
    price DECIMAL(10,2)
);
-- 2M записей, обновляется редко

-- 3. Таблица категорий
CATEGORIES (
    category_id UUID,
    category_name VARCHAR(100),
    region_id UUID
);

-- 4. Таблица регионов
REGIONS (
    region_id UUID,
    region_name VARCHAR(50),
    country VARCHAR(50)
);

Размеры данных:

  • Orders: 10 млн новых строк в день
  • Исторические данные: 3 года = 10.95 млрд строк
  • Размер на диске: ~500 GB

3. Целевое состояние (что должно быть на выходе)

Аналитическая таблица (результат):

DAILY_FINANCIAL_SUMMARY (
    summary_date DATE,
    region_id UUID,
    region_name VARCHAR(50),
    category_id UUID,
    category_name VARCHAR(100),
    total_orders INT,
    total_revenue DECIMAL(12,2),      -- сумма order_total для оплаченных
    platform_commission DECIMAL(12,2), -- 8% от revenue
    refunded_amount DECIMAL(12,2),     -- потери от возвратов
    net_revenue DECIMAL(12,2),         -- revenue - refunded
    net_profit DECIMAL(12,2),          -- revenue - refunded - commission
    yoy_growth_pct DECIMAL(5,2)        -- год к году рост %
);

Дополнительные отчёты:

TOP_PRODUCTS_BY_SALES (
    date DATE,
    rank INT,
    product_id UUID,
    product_name VARCHAR(255),
    category_name VARCHAR(100),
    units_sold INT,
    total_revenue DECIMAL(12,2)
);

4. Технический дизайн решения

Архитектура pipeline:

PostgreSQL (Orders, Products)
    ↓ (Extract)
Apache Spark (PySpark ETL)
    ↓ (Transform)
HDFS / Delta Lake (Staging)
    ↓ (Load)
PostgreSQL DW / Greenplum (Analytics)
    ↓
Tableau BI (Визуализация)

Шаг 1: Extract (получить свежие данные)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count, when, year, to_date
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("FinancialETL").getOrCreate()

# Прочитать заказы за последние 3 дня (инкрементально)
yesterday = (datetime.now() - timedelta(days=1)).date()
three_days_ago = yesterday - timedelta(days=2)

df_orders = spark.read.jdbc(
    url="jdbc:postgresql://pg-server/shop_db",
    table="orders",
    predicates=[f"order_date >= '{three_days_ago}'"],  # Incremental load
    properties={"user": "etl_user", "password": "***"}
)

df_products = spark.read.jdbc(
    url="jdbc:postgresql://pg-server/shop_db",
    table="products",
    properties={"user": "etl_user", "password": "***"}
)

df_categories = spark.read.jdbc(
    url="jdbc:postgresql://pg-server/shop_db",
    table="categories",
    properties={"user": "etl_user", "password": "***"}
)

df_regions = spark.read.jdbc(
    url="jdbc:postgresql://pg-server/shop_db",
    table="regions",
    properties={"user": "etl_user", "password": "***"}
)

Шаг 2: Transform (трансформировать данные)

# Очистка и обогащение
df_orders_clean = df_orders \
    .filter(col("payment_status") == "paid") \
    .filter(col("order_total") > 0) \
    .withColumn("order_date", to_date(col("order_date"))) \
    .join(df_products.select("product_id", "seller_id"), "seller_id", "left")

# Присоединить информацию о категориях и регионах
df_enriched = df_orders_clean \
    .join(df_products.select("product_id", "category_id"), "product_id", "left") \
    .join(df_categories.select("category_id", "category_name", "region_id"), "category_id", "left") \
    .join(df_regions.select("region_id", "region_name"), "region_id", "left")

# Вычислить финансовые метрики
df_with_metrics = df_enriched \
    .withColumn("platform_commission", col("order_total") * 0.08) \
    .withColumn("net_revenue", col("order_total")) \
    .withColumn("net_profit", col("order_total") * 0.92)

# Агрегировать по дням, регионам, категориям
df_daily_summary = df_with_metrics.groupBy(
    col("order_date").alias("summary_date"),
    "region_id",
    "region_name",
    "category_id",
    "category_name"
).agg(
    count("*").alias("total_orders"),
    spark_sum("order_total").alias("total_revenue"),
    spark_sum("platform_commission").alias("platform_commission"),
    spark_sum("net_profit").alias("net_profit")
)

Шаг 3: Вычислить YoY рост

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, year, month, day

# Добавить год
df_with_year = df_daily_summary.withColumn(
    "year", year(col("summary_date"))
).withColumn(
    "month_day", concat(month(col("summary_date")), day(col("summary_date")))
)

# Оконная функция для сравнения годов
window_spec = Window.partitionBy(
    "region_id", "category_id", "month_day"
).orderBy("year")

df_with_yoy = df_with_year.withColumn(
    "prev_year_revenue", lag("total_revenue", 1).over(window_spec)
).withColumn(
    "yoy_growth_pct", 
    ((col("total_revenue") - col("prev_year_revenue")) / col("prev_year_revenue") * 100)
)

Шаг 4: Load (сохранить результаты)

# Сохраняем в Delta Lake для ACID гарантий
df_with_yoy.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .partitionBy("summary_date") \
    .save("s3://data-warehouse/daily_financial_summary")

# Также загружаем в PostgreSQL для BI
df_with_yoy.coalesce(1).write.jdbc(
    url="jdbc:postgresql://pg-dw/analytics",
    table="daily_financial_summary",
    mode="overwrite",
    properties={"user": "analytics", "password": "***"}
)

5. Расписание (когда запускать)

# Airflow DAG (выполняется ежедневно в 07:00)
from airflow import DAG
from airflow.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-engineering",
    "retries": 3,
    "retry_delay": timedelta(minutes=15),
}

dag = DAG(
    "financial_etl_daily",
    default_args=default_args,
    schedule_interval="0 7 * * *",  # 07:00 каждый день
    start_date=datetime(2024, 1, 1),
)

run_etl = SparkSubmitOperator(
    task_id="financial_etl",
    application="/opt/jobs/financial_etl.py",
    conf={"spark.executor.memory": "8g"},
    dag=dag,
)

6. Критерии успеха

  • ✅ Pipeline обновляется ежедневно, готов к 08:00
  • ✅ Данные консистентны между HDFS, Postgres и Tableau
  • ✅ Ретроспективные данные за 3 года загружены
  • ✅ Возможность пересчитать данные за любой период
  • ✅ Документация процесса обновления
  • ✅ Алерты при ошибке pipeline

Вывод

Это типичная задача для Data Engineer:

  1. Понять требования бизнеса и технические ограничения
  2. Спроектировать архитектуру (sources → ETL → warehouse → BI)
  3. Реализовать pipeline (Extract → Transform → Load)
  4. Автоматизировать расписание выполнения
  5. Обеспечить надёжность (retry, monitoring, alerting)

Ключевые навыки: SQL, Spark, Python, знание архитектуры DW, bash/scheduling.