← Назад к вопросам
Был ли в вашей системе api
1.0 Junior🔥 171 комментариев
#Архитектура и проектирование#Инструменты разработки
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI21 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# API в системах Data Engineering: Архитектура и примеры
Короткий ответ
Да, на каждом проекте были API'и. Но они разные для data и для business логики.
Типы API в Data Systems
1. Data Query API (основной в моей работе)
Назначение: Получить данные из data warehouse/lake для отчётов, дашбордов, ML моделей.
Пример с PayFlow:
from fastapi import FastAPI, Query
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime, timedelta
app = FastAPI()
engine = create_engine('bigquery://project/dataset')
@app.get("/api/v1/user-metrics")
def get_user_metrics(
date_from: str = Query(None), # ISO format: 2024-03-21
date_to: str = Query(None),
user_id: str = Query(None),
metric_type: str = Query("daily") # daily, weekly, monthly
):
"""Получить метрики активности пользователей"""
# Параметры по умолчанию
if not date_from:
date_from = (datetime.now() - timedelta(days=30)).isoformat()
if not date_to:
date_to = datetime.now().isoformat()
# SQL query
query = f"""
SELECT
DATE(timestamp) as date,
user_id,
COUNT(*) as event_count,
COUNT(DISTINCT session_id) as session_count,
SUM(amount) as total_spent
FROM events
WHERE DATE(timestamp) BETWEEN '{date_from}' AND '{date_to}'
"""
if user_id:
query += f" AND user_id = '{user_id}'"
query += " GROUP BY DATE(timestamp), user_id"
if metric_type == "weekly":
query = query.replace("DATE(timestamp)", "DATE_TRUNC(timestamp, WEEK)")
elif metric_type == "monthly":
query = query.replace("DATE(timestamp)", "DATE_TRUNC(timestamp, MONTH)")
df = pd.read_sql(query, engine)
return {
"status": "success",
"data": df.to_dict('records'),
"row_count": len(df),
"generated_at": datetime.now().isoformat()
}
@app.get("/api/v1/cohort-analysis")
def get_cohort_analysis(
start_date: str,
metric: str = "retention" # retention, revenue, churn
):
"""Когортный анализ (когда пользователь присоединился, как себя вёл потом)"""
query = f"""
WITH cohorts AS (
SELECT
user_id,
DATE(MIN(created_at)) as cohort_date
FROM users
WHERE DATE(created_at) >= '{start_date}'
GROUP BY user_id
),
user_events AS (
SELECT
c.user_id,
c.cohort_date,
DATE_DIFF(DATE(e.timestamp), c.cohort_date, DAY) as days_after_join,
COUNT(*) as events
FROM cohorts c
JOIN events e ON c.user_id = e.user_id
GROUP BY c.user_id, c.cohort_date, days_after_join
)
SELECT
cohort_date,
days_after_join,
COUNT(DISTINCT user_id) as users,
SUM(events) as total_events
FROM user_events
GROUP BY cohort_date, days_after_join
ORDER BY cohort_date, days_after_join
"""
df = pd.read_sql(query, engine)
return {"status": "success", "data": df.to_dict('records')}
2. Real-time Metrics API
Назначение: Получить current метрики (сейчас, не исторические данные).
from flask import Flask
import redis
from datetime import datetime
import json
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379)
@app.get("/api/v1/live-metrics")
def get_live_metrics():
"""Real-time метрики (обновляются каждую минуту)"""
# Данные кэшируются в Redis и обновляются через Spark Streaming
metrics = {
"active_users_now": int(redis_client.get('active_users_now') or 0),
"transactions_today": float(redis_client.get('transactions_today') or 0),
"avg_session_time": float(redis_client.get('avg_session_time') or 0),
"api_health": json.loads(redis_client.get('api_health') or '{}'),
}
return {
"status": "success",
"data": metrics,
"timestamp": datetime.now().isoformat(),
"cache_age_seconds": int(redis_client.ttl('active_users_now'))
}
# Spark job (в Airflow DAG) обновляет Redis каждую минуту:
# df.select(count("*")).collect()[0][0] → redis.set('active_users_now', count)
3. Data Ingestion API (для приложения)
Назначение: Приложение отправляет события в систему.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
from kafka import KafkaProducer
app = FastAPI()
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
class EventPayload(BaseModel):
user_id: str
event_type: str # "click", "purchase", "login"
amount: float = None
metadata: dict = {}
@app.post("/api/v1/events")
def track_event(event: EventPayload):
"""Приложение отправляет события"""
# Валидация
if event.event_type not in ["click", "purchase", "login", "logout"]:
raise HTTPException(status_code=400, detail="Invalid event type")
if event.event_type == "purchase" and event.amount is None:
raise HTTPException(status_code=400, detail="Purchase requires amount")
# Отправляем в Kafka
message = {
"user_id": event.user_id,
"event_type": event.event_type,
"amount": event.amount,
"timestamp": datetime.now().isoformat(),
"metadata": event.metadata
}
producer.send('raw_events', value=message)
return {"status": "received", "event_id": hash(str(message))}
4. ML Feature API
Назначение: ML модели получают features для predictions.
from fastapi import FastAPI
from pydantic import BaseModel
import pickle
import numpy as np
app = FastAPI()
# Загружаем DataFrame с features (обновляется каждый день через Spark)
feature_store_df = pd.read_parquet('s3://feature-store/features_latest/')
feature_store_dict = feature_store_df.set_index('user_id').to_dict('index')
class PredictionRequest(BaseModel):
user_id: str
model_version: str = "v1.0"
@app.post("/api/v1/predict-churn")
def predict_churn(request: PredictionRequest):
"""Предсказать вероятность churn'а пользователя"""
# Получаем features для пользователя
if request.user_id not in feature_store_dict:
return {"error": "User not found in feature store"}
features = feature_store_dict[request.user_id]
# Подготавливаем для модели
X = np.array([
features['days_since_signup'],
features['total_spent'],
features['last_activity_days_ago'],
features['avg_daily_purchases'],
features['support_tickets_count']
]).reshape(1, -1)
# Загружаем модель
model = pickle.load(open(f'models/churn_model_{request.model_version}.pkl', 'rb'))
# Предсказываем
prediction = model.predict_proba(X)[0]
return {
"user_id": request.user_id,
"churn_probability": float(prediction[1]),
"risk_level": "high" if prediction[1] > 0.7 else "medium" if prediction[1] > 0.4 else "low",
"model_version": request.model_version
}
API Design Best Practices (что использовал я)
1. Versioning
/api/v1/users ← версия 1
/api/v2/users ← версия 2 (breaking changes)
2. Pagination (для больших датасетов)
@app.get("/api/v1/events")
def get_events(
limit: int = Query(100, le=1000), # max 1000
offset: int = Query(0)
):
# LIMIT {limit} OFFSET {offset}
pass
3. Rate limiting (защита от abuse)
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.get("/api/v1/metrics", dependencies=[Depends(limiter.limit("100/minute"))])
def get_metrics():
pass
4. Caching
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_query(user_id: str, date: str):
# Вычисляется один раз, потом берётся из кэша
pass
5. Async для скорости
from asyncio import run
import aiohttp
@app.get("/api/v1/multi-metrics")
async def get_multi_metrics(user_ids: list[str]):
# Выполняется параллельно вместо последовательно
tasks = [fetch_user_metrics(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
Мониторинг API
from prometheus_client import Counter, Histogram
request_count = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'api_request_duration_seconds',
'API request duration',
['endpoint']
)
@app.middleware("http")
async def log_metrics(request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.labels(endpoint=request.url.path).observe(duration)
return response
Итого
API в data системах:
- Query API — для дашбордов и отчётов
- Ingestion API — для приложения отправлять события
- Feature API — для ML моделей
- Real-time API — для live метрик
Каждый имеет свой масштаб, requirements и оптимизации. Data Engineer должен понимать все эти типы и их особенности.