← Назад к вопросам
Приведи пример постановки задачи
1.0 Junior🔥 161 комментариев
#Архитектура и проектирование#Опыт и soft skills
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Пример постановки задачи для Data Engineer
Вот реальный пример постановки задачи, которую мог получить на интервью или в компании.
Контекст компании
Компания MegaShop — крупный e-commerce маркетплейс с 500K+ продавцов. Есть проблема: руководство не может быстро отслеживать финансовые показатели по каждому регионе и категории продуктов.
Задача: Построить Data Pipeline для финансовой аналитики
1. Требования (что нужно сделать)
Бизнес-требования:
- Отслеживать выручку (revenue) и комиссию платформы по дням
- Видеть топ-10 категорий продуктов по продажам
- Анализировать прибыль по географическим регионам
- Сравнивать год к году (YoY) показатели
- Финансисты должны получать отчёт каждое утро (08:00)
Технические требования:
- Pipeline должен обновляться ежедневно
- Задержка данных не более 2-3 часов от момента транзакции
- Может работать на существующей инфраструктуре (Hadoop, PostgreSQL)
- Результаты должны быть доступны в BI-инструменте (Tableau)
- Ретроспективные данные за последние 3 года
2. Исходные данные (что есть)
Источники данных:
-- 1. Таблица заказов (operational OLTP система)
ORDERS (
order_id UUID,
seller_id UUID,
buyer_id UUID,
order_date TIMESTAMP,
order_total DECIMAL(10,2),
payment_status VARCHAR(20), -- 'paid', 'pending', 'refunded'
delivery_status VARCHAR(20) -- 'created', 'shipped', 'delivered'
);
-- Обновляется в реальном времени, 10 млн записей/день
-- 2. Таблица товаров
PRODUCTS (
product_id UUID,
product_name VARCHAR(255),
category_id UUID,
seller_id UUID,
price DECIMAL(10,2)
);
-- 2M записей, обновляется редко
-- 3. Таблица категорий
CATEGORIES (
category_id UUID,
category_name VARCHAR(100),
region_id UUID
);
-- 4. Таблица регионов
REGIONS (
region_id UUID,
region_name VARCHAR(50),
country VARCHAR(50)
);
Размеры данных:
- Orders: 10 млн новых строк в день
- Исторические данные: 3 года = 10.95 млрд строк
- Размер на диске: ~500 GB
3. Целевое состояние (что должно быть на выходе)
Аналитическая таблица (результат):
DAILY_FINANCIAL_SUMMARY (
summary_date DATE,
region_id UUID,
region_name VARCHAR(50),
category_id UUID,
category_name VARCHAR(100),
total_orders INT,
total_revenue DECIMAL(12,2), -- сумма order_total для оплаченных
platform_commission DECIMAL(12,2), -- 8% от revenue
refunded_amount DECIMAL(12,2), -- потери от возвратов
net_revenue DECIMAL(12,2), -- revenue - refunded
net_profit DECIMAL(12,2), -- revenue - refunded - commission
yoy_growth_pct DECIMAL(5,2) -- год к году рост %
);
Дополнительные отчёты:
TOP_PRODUCTS_BY_SALES (
date DATE,
rank INT,
product_id UUID,
product_name VARCHAR(255),
category_name VARCHAR(100),
units_sold INT,
total_revenue DECIMAL(12,2)
);
4. Технический дизайн решения
Архитектура pipeline:
PostgreSQL (Orders, Products)
↓ (Extract)
Apache Spark (PySpark ETL)
↓ (Transform)
HDFS / Delta Lake (Staging)
↓ (Load)
PostgreSQL DW / Greenplum (Analytics)
↓
Tableau BI (Визуализация)
Шаг 1: Extract (получить свежие данные)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count, when, year, to_date
from datetime import datetime, timedelta
spark = SparkSession.builder.appName("FinancialETL").getOrCreate()
# Прочитать заказы за последние 3 дня (инкрементально)
yesterday = (datetime.now() - timedelta(days=1)).date()
three_days_ago = yesterday - timedelta(days=2)
df_orders = spark.read.jdbc(
url="jdbc:postgresql://pg-server/shop_db",
table="orders",
predicates=[f"order_date >= '{three_days_ago}'"], # Incremental load
properties={"user": "etl_user", "password": "***"}
)
df_products = spark.read.jdbc(
url="jdbc:postgresql://pg-server/shop_db",
table="products",
properties={"user": "etl_user", "password": "***"}
)
df_categories = spark.read.jdbc(
url="jdbc:postgresql://pg-server/shop_db",
table="categories",
properties={"user": "etl_user", "password": "***"}
)
df_regions = spark.read.jdbc(
url="jdbc:postgresql://pg-server/shop_db",
table="regions",
properties={"user": "etl_user", "password": "***"}
)
Шаг 2: Transform (трансформировать данные)
# Очистка и обогащение
df_orders_clean = df_orders \
.filter(col("payment_status") == "paid") \
.filter(col("order_total") > 0) \
.withColumn("order_date", to_date(col("order_date"))) \
.join(df_products.select("product_id", "seller_id"), "seller_id", "left")
# Присоединить информацию о категориях и регионах
df_enriched = df_orders_clean \
.join(df_products.select("product_id", "category_id"), "product_id", "left") \
.join(df_categories.select("category_id", "category_name", "region_id"), "category_id", "left") \
.join(df_regions.select("region_id", "region_name"), "region_id", "left")
# Вычислить финансовые метрики
df_with_metrics = df_enriched \
.withColumn("platform_commission", col("order_total") * 0.08) \
.withColumn("net_revenue", col("order_total")) \
.withColumn("net_profit", col("order_total") * 0.92)
# Агрегировать по дням, регионам, категориям
df_daily_summary = df_with_metrics.groupBy(
col("order_date").alias("summary_date"),
"region_id",
"region_name",
"category_id",
"category_name"
).agg(
count("*").alias("total_orders"),
spark_sum("order_total").alias("total_revenue"),
spark_sum("platform_commission").alias("platform_commission"),
spark_sum("net_profit").alias("net_profit")
)
Шаг 3: Вычислить YoY рост
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, year, month, day
# Добавить год
df_with_year = df_daily_summary.withColumn(
"year", year(col("summary_date"))
).withColumn(
"month_day", concat(month(col("summary_date")), day(col("summary_date")))
)
# Оконная функция для сравнения годов
window_spec = Window.partitionBy(
"region_id", "category_id", "month_day"
).orderBy("year")
df_with_yoy = df_with_year.withColumn(
"prev_year_revenue", lag("total_revenue", 1).over(window_spec)
).withColumn(
"yoy_growth_pct",
((col("total_revenue") - col("prev_year_revenue")) / col("prev_year_revenue") * 100)
)
Шаг 4: Load (сохранить результаты)
# Сохраняем в Delta Lake для ACID гарантий
df_with_yoy.write.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.partitionBy("summary_date") \
.save("s3://data-warehouse/daily_financial_summary")
# Также загружаем в PostgreSQL для BI
df_with_yoy.coalesce(1).write.jdbc(
url="jdbc:postgresql://pg-dw/analytics",
table="daily_financial_summary",
mode="overwrite",
properties={"user": "analytics", "password": "***"}
)
5. Расписание (когда запускать)
# Airflow DAG (выполняется ежедневно в 07:00)
from airflow import DAG
from airflow.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-engineering",
"retries": 3,
"retry_delay": timedelta(minutes=15),
}
dag = DAG(
"financial_etl_daily",
default_args=default_args,
schedule_interval="0 7 * * *", # 07:00 каждый день
start_date=datetime(2024, 1, 1),
)
run_etl = SparkSubmitOperator(
task_id="financial_etl",
application="/opt/jobs/financial_etl.py",
conf={"spark.executor.memory": "8g"},
dag=dag,
)
6. Критерии успеха
- ✅ Pipeline обновляется ежедневно, готов к 08:00
- ✅ Данные консистентны между HDFS, Postgres и Tableau
- ✅ Ретроспективные данные за 3 года загружены
- ✅ Возможность пересчитать данные за любой период
- ✅ Документация процесса обновления
- ✅ Алерты при ошибке pipeline
Вывод
Это типичная задача для Data Engineer:
- Понять требования бизнеса и технические ограничения
- Спроектировать архитектуру (sources → ETL → warehouse → BI)
- Реализовать pipeline (Extract → Transform → Load)
- Автоматизировать расписание выполнения
- Обеспечить надёжность (retry, monitoring, alerting)
Ключевые навыки: SQL, Spark, Python, знание архитектуры DW, bash/scheduling.