← Назад к вопросам

Был ли в вашей системе api

1.0 Junior🔥 171 комментариев
#Архитектура и проектирование#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI21 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# API в системах Data Engineering: Архитектура и примеры

Короткий ответ

Да, на каждом проекте были API'и. Но они разные для data и для business логики.

Типы API в Data Systems

1. Data Query API (основной в моей работе)

Назначение: Получить данные из data warehouse/lake для отчётов, дашбордов, ML моделей.

Пример с PayFlow:

from fastapi import FastAPI, Query
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime, timedelta

app = FastAPI()
engine = create_engine('bigquery://project/dataset')

@app.get("/api/v1/user-metrics")
def get_user_metrics(
    date_from: str = Query(None),  # ISO format: 2024-03-21
    date_to: str = Query(None),
    user_id: str = Query(None),
    metric_type: str = Query("daily")  # daily, weekly, monthly
):
    """Получить метрики активности пользователей"""
    
    # Параметры по умолчанию
    if not date_from:
        date_from = (datetime.now() - timedelta(days=30)).isoformat()
    if not date_to:
        date_to = datetime.now().isoformat()
    
    # SQL query
    query = f"""
    SELECT 
        DATE(timestamp) as date,
        user_id,
        COUNT(*) as event_count,
        COUNT(DISTINCT session_id) as session_count,
        SUM(amount) as total_spent
    FROM events
    WHERE DATE(timestamp) BETWEEN '{date_from}' AND '{date_to}'
    """
    
    if user_id:
        query += f" AND user_id = '{user_id}'"
    
    query += " GROUP BY DATE(timestamp), user_id"
    
    if metric_type == "weekly":
        query = query.replace("DATE(timestamp)", "DATE_TRUNC(timestamp, WEEK)")
    elif metric_type == "monthly":
        query = query.replace("DATE(timestamp)", "DATE_TRUNC(timestamp, MONTH)")
    
    df = pd.read_sql(query, engine)
    
    return {
        "status": "success",
        "data": df.to_dict('records'),
        "row_count": len(df),
        "generated_at": datetime.now().isoformat()
    }

@app.get("/api/v1/cohort-analysis")
def get_cohort_analysis(
    start_date: str,
    metric: str = "retention"  # retention, revenue, churn
):
    """Когортный анализ (когда пользователь присоединился, как себя вёл потом)"""
    
    query = f"""
    WITH cohorts AS (
        SELECT 
            user_id,
            DATE(MIN(created_at)) as cohort_date
        FROM users
        WHERE DATE(created_at) >= '{start_date}'
        GROUP BY user_id
    ),
    user_events AS (
        SELECT 
            c.user_id,
            c.cohort_date,
            DATE_DIFF(DATE(e.timestamp), c.cohort_date, DAY) as days_after_join,
            COUNT(*) as events
        FROM cohorts c
        JOIN events e ON c.user_id = e.user_id
        GROUP BY c.user_id, c.cohort_date, days_after_join
    )
    SELECT 
        cohort_date,
        days_after_join,
        COUNT(DISTINCT user_id) as users,
        SUM(events) as total_events
    FROM user_events
    GROUP BY cohort_date, days_after_join
    ORDER BY cohort_date, days_after_join
    """
    
    df = pd.read_sql(query, engine)
    return {"status": "success", "data": df.to_dict('records')}

2. Real-time Metrics API

Назначение: Получить current метрики (сейчас, не исторические данные).

from flask import Flask
import redis
from datetime import datetime
import json

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379)

@app.get("/api/v1/live-metrics")
def get_live_metrics():
    """Real-time метрики (обновляются каждую минуту)"""
    
    # Данные кэшируются в Redis и обновляются через Spark Streaming
    metrics = {
        "active_users_now": int(redis_client.get('active_users_now') or 0),
        "transactions_today": float(redis_client.get('transactions_today') or 0),
        "avg_session_time": float(redis_client.get('avg_session_time') or 0),
        "api_health": json.loads(redis_client.get('api_health') or '{}'),
    }
    
    return {
        "status": "success",
        "data": metrics,
        "timestamp": datetime.now().isoformat(),
        "cache_age_seconds": int(redis_client.ttl('active_users_now'))
    }

# Spark job (в Airflow DAG) обновляет Redis каждую минуту:
# df.select(count("*")).collect()[0][0] → redis.set('active_users_now', count)

3. Data Ingestion API (для приложения)

Назначение: Приложение отправляет события в систему.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
from kafka import KafkaProducer

app = FastAPI()
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

class EventPayload(BaseModel):
    user_id: str
    event_type: str  # "click", "purchase", "login"
    amount: float = None
    metadata: dict = {}

@app.post("/api/v1/events")
def track_event(event: EventPayload):
    """Приложение отправляет события"""
    
    # Валидация
    if event.event_type not in ["click", "purchase", "login", "logout"]:
        raise HTTPException(status_code=400, detail="Invalid event type")
    
    if event.event_type == "purchase" and event.amount is None:
        raise HTTPException(status_code=400, detail="Purchase requires amount")
    
    # Отправляем в Kafka
    message = {
        "user_id": event.user_id,
        "event_type": event.event_type,
        "amount": event.amount,
        "timestamp": datetime.now().isoformat(),
        "metadata": event.metadata
    }
    
    producer.send('raw_events', value=message)
    
    return {"status": "received", "event_id": hash(str(message))}

4. ML Feature API

Назначение: ML модели получают features для predictions.

from fastapi import FastAPI
from pydantic import BaseModel
import pickle
import numpy as np

app = FastAPI()

# Загружаем DataFrame с features (обновляется каждый день через Spark)
feature_store_df = pd.read_parquet('s3://feature-store/features_latest/')
feature_store_dict = feature_store_df.set_index('user_id').to_dict('index')

class PredictionRequest(BaseModel):
    user_id: str
    model_version: str = "v1.0"

@app.post("/api/v1/predict-churn")
def predict_churn(request: PredictionRequest):
    """Предсказать вероятность churn'а пользователя"""
    
    # Получаем features для пользователя
    if request.user_id not in feature_store_dict:
        return {"error": "User not found in feature store"}
    
    features = feature_store_dict[request.user_id]
    
    # Подготавливаем для модели
    X = np.array([
        features['days_since_signup'],
        features['total_spent'],
        features['last_activity_days_ago'],
        features['avg_daily_purchases'],
        features['support_tickets_count']
    ]).reshape(1, -1)
    
    # Загружаем модель
    model = pickle.load(open(f'models/churn_model_{request.model_version}.pkl', 'rb'))
    
    # Предсказываем
    prediction = model.predict_proba(X)[0]
    
    return {
        "user_id": request.user_id,
        "churn_probability": float(prediction[1]),
        "risk_level": "high" if prediction[1] > 0.7 else "medium" if prediction[1] > 0.4 else "low",
        "model_version": request.model_version
    }

API Design Best Practices (что использовал я)

1. Versioning

/api/v1/users        ← версия 1
/api/v2/users        ← версия 2 (breaking changes)

2. Pagination (для больших датасетов)

@app.get("/api/v1/events")
def get_events(
    limit: int = Query(100, le=1000),  # max 1000
    offset: int = Query(0)
):
    # LIMIT {limit} OFFSET {offset}
    pass

3. Rate limiting (защита от abuse)

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.get("/api/v1/metrics", dependencies=[Depends(limiter.limit("100/minute"))])
def get_metrics():
    pass

4. Caching

from functools import lru_cache

@lru_cache(maxsize=128)
def expensive_query(user_id: str, date: str):
    # Вычисляется один раз, потом берётся из кэша
    pass

5. Async для скорости

from asyncio import run
import aiohttp

@app.get("/api/v1/multi-metrics")
async def get_multi_metrics(user_ids: list[str]):
    # Выполняется параллельно вместо последовательно
    tasks = [fetch_user_metrics(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks)
    return results

Мониторинг API

from prometheus_client import Counter, Histogram

request_count = Counter(
    'api_requests_total',
    'Total API requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'api_request_duration_seconds',
    'API request duration',
    ['endpoint']
)

@app.middleware("http")
async def log_metrics(request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    
    request_count.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    request_duration.labels(endpoint=request.url.path).observe(duration)
    
    return response

Итого

API в data системах:

  • Query API — для дашбордов и отчётов
  • Ingestion API — для приложения отправлять события
  • Feature API — для ML моделей
  • Real-time API — для live метрик

Каждый имеет свой масштаб, requirements и оптимизации. Data Engineer должен понимать все эти типы и их особенности.

Был ли в вашей системе api | PrepBro