← Назад к вопросам

Что делал для интеграции ML-моделей в инфраструктуру?

2.0 Middle🔥 201 комментариев
#DevOps и инфраструктура

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Интеграция ML-моделей в инфраструктуру

Интеграция машинного обучения в production инфраструктуру — это сложная задача, которая требует системного подхода. Расскажу о моём опыте на конкретных примерах.

1. Архитектурное решение: где запускать модель

Это критический выбор, потому что от него зависит всё.

Вариант A: Синхронный inference (вызов при запросе)

# ✅ Когда используем: нужен результат СЕЙЧАС
# ✅ Примеры: фильтр спама, recommendation на странице, fraud detection

# Архитектура
user_request → FastAPI endpoint → ML inference → response

from fastapi import FastAPI
from transformers import pipeline

app = FastAPI()

# Модель загружается один раз при старте
spam_classifier = pipeline(
    "text-classification",
    model="distilbert-base-uncased"
)

@app.post("/classify")
def classify_text(text: str):
    # Inference происходит в процессе обработки запроса
    result = spam_classifier(text)
    
    return {
        "label": result[0]["label"],
        "score": result[0]["score"]
    }

# Минусы: медленно (100-500ms на один inference)
# Плюсы: результат сразу

Вариант B: Асинхронный inference (очередь задач)

# ✅ Когда используем: результат можно получить позже
# ✅ Примеры: генерация отчёта, обработка батча, аналитика

# Архитектура
user_request → Queue → Worker → ML inference → Result storage
                ↑                                  ↓
                └──── User polls for result ──────┘

from celery import Celery
from transformers import pipeline

app = Celery('ml_tasks')
model = pipeline("text-classification")  # Загружается в worker'е

@app.task
def classify_text_async(text: str, task_id: str):
    # Это выполняется в background worker'е
    result = model(text)
    
    # Сохраняем результат
    cache.set(f"result:{task_id}", result, timeout=3600)
    return result

# Из API
@app.post("/classify-async")
def classify_async(text: str):
    task = classify_text_async.delay(text)
    return {"task_id": task.id, "status_url": f"/status/{task.id}"}

@app.get("/status/{task_id}")
def get_status(task_id: str):
    result = cache.get(f"result:{task_id}")
    if result:
        return {"status": "done", "result": result}
    return {"status": "processing"}

# Плюсы: scalable, не блокирует основной процесс
# Минусы: пользователь ждёт результата

Вариант C: Batch inference (обработка в батчах)

# ✅ Когда используем: высокая throughput требуется
# ✅ Примеры: обработка исторических данных, night jobs

# Архитектура
Data source → Batch processor → Split into batches → Workers → Storage

from torch.utils.data import DataLoader, TensorDataset
import torch

def batch_inference(texts: List[str], batch_size: int = 32):
    model = load_model()
    
    # Создаём батчи
    loader = DataLoader(
        TensorDataset(torch.tensor(texts)),
        batch_size=batch_size
    )
    
    results = []
    with torch.no_grad():
        for batch in loader:
            # Inference на целом батче — быстрее!
            output = model(batch)
            results.extend(output)
    
    return results

# Это 10-100x быстрее, чем inference по одному
# Но требует больше памяти

2. Управление моделями в production

Model Registry

# ✅ Где хранить модели? MLflow — стандартный choice
from mlflow.pyfunc import load_model
import mlflow

# Сохранить модель
mlflow.pytorch.log_model(model, "sentiment_model")

# Позже загрузить с версионированием
model = mlflow.pyfunc.load_model("models:/sentiment_model/production")

# MLflow позволяет:
# - Версионировать модели (model1, model2, model3)
# - Трекировать metrics (accuracy, F1, latency)
# - A/B тестировать разные версии
# - Управлять lifecycle (staging, production, archived)

Вычисление требуемых ресурсов

# Нужно понимать, сколько памяти и CPU требуется

model_resources = {
    "BERT base": {
        "memory": "350MB",
        "inference_time": "50-100ms (single)",
        "throughput": "10-20 samples/sec (single GPU)",
        "hardware": "1x GPU (T4) или CPU"
    },
    "GPT-2 medium": {
        "memory": "350MB",
        "inference_time": "100-200ms",
        "throughput": "5-10 samples/sec",
        "hardware": "1x GPU"
    },
    "GPT-3 (via API)": {
        "memory": "0 (external)",
        "inference_time": "500ms-5sec",
        "throughput": "Limited by API",
        "cost": "$0.002 per 1K tokens"
    }
}

# Для production нужно рассчитать
required_resources = {
    "RPS": 1000,  # requests per second
    "avg_latency_requirement": "200ms",  # SLA
    "inference_time_per_sample": 100,  # ms
    "required_throughput": 1000 * (200 / 100),  # 2000 samples/sec
    "required_gpus": (2000 / 10),  # 200 GPUs! (если 10 samples/sec на GPU)
    "or_use_batch": "Батчирование снижает latency и требует меньше GPU"
}

3. Кеширование результатов ML

# ❌ Наивно: каждый раз вычисляем
@app.post("/predict")
def predict(user_id: int):
    features = extract_features(user_id)  # Дорого
    prediction = model.predict(features)  # Дорого
    return {"prediction": prediction}

# ✅ Правильно: кешируем результаты
from functools import lru_cache
import hashlib

@app.post("/predict")
def predict(user_id: int):
    cache_key = f"prediction:{user_id}"
    
    # Проверяем кеш
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Если нет в кеше
    features = extract_features(user_id)
    prediction = model.predict(features)
    
    # Сохраняем в кеш на 1 час
    redis_client.setex(
        cache_key,
        3600,
        json.dumps({"prediction": prediction})
    )
    
    return {"prediction": prediction}

# Это сокращает нагрузку на модель в 10-100x раз!

4. Мониторинг ML моделей

# ✅ Мониторинг production моделей

from prometheus_client import Counter, Histogram, Gauge
import time

# Метрики
inference_time = Histogram(
    'ml_inference_duration_seconds',
    'Time spent on ML inference'
)
inference_count = Counter(
    'ml_inference_total',
    'Total ML inferences',
    ['model', 'status']  # status: success, error, timeout
)
model_accuracy = Gauge(
    'ml_model_accuracy',
    'Model accuracy on validation set'
)

@app.post("/predict")
def predict(data):
    start = time.time()
    try:
        result = model.predict(data)
        duration = time.time() - start
        
        inference_time.observe(duration)
        inference_count.labels(
            model="sentiment_v3",
            status="success"
        ).inc()
        
        # Алерт если медленно
        if duration > 1.0:
            logger.warning(f"Slow inference: {duration}s")
        
        return {"prediction": result}
    except Exception as e:
        inference_count.labels(
            model="sentiment_v3",
            status="error"
        ).inc()
        raise

# Важно мониторить:
# - Latency
# - Throughput
# - Error rate
# - Model drift (метрики меняются со временем)

5. Handling Model Drift

# ⚠️ Проблема: модель была обучена на старых данных
# Со временем реальные данные меняются, и модель деградирует

# ✅ Решение: мониторить accuracy на реальных данных

def monitor_model_drift(predictions, actual_labels):
    # Периодически проверяем accuracy на реальных данных
    accuracy = calculate_accuracy(predictions, actual_labels)
    
    baseline_accuracy = 0.95  # Было при обучении
    current_accuracy = accuracy
    
    if current_accuracy < baseline_accuracy * 0.9:
        # Accuracy упала на 10% — ALARM!
        alert("Model drift detected!")
        # Нужно переобучить модель
    
    return {
        "baseline": baseline_accuracy,
        "current": current_accuracy,
        "drift": baseline_accuracy - current_accuracy
    }

# Процесс переобучения
def retrain_model():
    # 1. Собрать новые данные за последний месяц
    # 2. Переобучить модель
    # 3. Валидировать на holdout set
    # 4. A/B тестировать новую версию
    # 5. Постепенно трафик к новой версии
    # 6. Отпустить старую версию
    pass

6. A/B тестирование моделей

# ✅ Как выпустить новую версию модели без риска

@app.post("/predict")
def predict(user_id: int):
    # 10% трафика на новую версию
    use_new_model = random.random() < 0.1
    
    if use_new_model:
        # Новая, более мощная модель
        prediction = model_v3.predict(features)
        model_version = "v3"
    else:
        # Старая, проверенная модель
        prediction = model_v2.predict(features)
        model_version = "v2"
    
    # Логируем, какую модель использовали
    log_prediction(
        user_id=user_id,
        prediction=prediction,
        model_version=model_version
    )
    
    return {"prediction": prediction}

# Анализируем результаты
# v3: accuracy 0.96, latency 150ms, cost $0.003
# v2: accuracy 0.94, latency 100ms, cost $0.002
# Вердикт: v3 лучше, переходим на 100%

7. Пример полной pipeline

# Production-ready ML pipeline

from fastapi import FastAPI
from celery import Celery
import mlflow
from prometheus_client import Counter, Histogram

app = FastAPI()
celery_app = Celery('ml_app')

# Загружаем модель при старте
model = mlflow.pyfunc.load_model(
    "models:/recommendation_model/production"
)

metrics = {
    "inference_time": Histogram('inference_duration_ms'),
    "inference_count": Counter('inference_total'),
}

@app.post("/recommend")
async def get_recommendations(user_id: int, limit: int = 10):
    # Проверяем кеш
    cache_key = f"recommendations:{user_id}:{limit}"
    cached = await redis.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Дорогая операция — в background
    task = process_recommendation.delay(user_id, limit)
    
    return {
        "task_id": task.id,
        "status_url": f"/recommendation-status/{task.id}"
    }

@celery_app.task
def process_recommendation(user_id: int, limit: int):
    import time
    start = time.time()
    
    try:
        # Извлекаем features
        features = fetch_user_features(user_id)
        
        # Prediction
        scores = model.predict(features)
        recommendations = get_top_items(scores, limit)
        
        duration = (time.time() - start) * 1000
        metrics["inference_time"].observe(duration)
        metrics["inference_count"].labels(status="success").inc()
        
        # Кешируем результат
        cache_key = f"recommendations:{user_id}:{limit}"
        redis.setex(cache_key, 3600, json.dumps(recommendations))
        
        return recommendations
    except Exception as e:
        metrics["inference_count"].labels(status="error").inc()
        logger.exception(f"Recommendation failed for user {user_id}")
        raise

Ключевые уроки

  1. Начните с простого — синхронный inference, потом оптимизируйте
  2. Меряйте всё — latency, accuracy, cost
  3. Кешируйте результаты — даже если модель медленная
  4. Мониторьте drift — модель деградирует со временем
  5. A/B тестируйте — новые версии постепенно
  6. Используйте батчирование — для throughput
  7. Версионируйте модели — всегда знайте, что в production

ML в production — это не только про model accuracy, но про инженерию!