← Назад к вопросам
Что делал для интеграции ML-моделей в инфраструктуру?
2.0 Middle🔥 201 комментариев
#DevOps и инфраструктура
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Интеграция ML-моделей в инфраструктуру
Интеграция машинного обучения в production инфраструктуру — это сложная задача, которая требует системного подхода. Расскажу о моём опыте на конкретных примерах.
1. Архитектурное решение: где запускать модель
Это критический выбор, потому что от него зависит всё.
Вариант A: Синхронный inference (вызов при запросе)
# ✅ Когда используем: нужен результат СЕЙЧАС
# ✅ Примеры: фильтр спама, recommendation на странице, fraud detection
# Архитектура
user_request → FastAPI endpoint → ML inference → response
from fastapi import FastAPI
from transformers import pipeline
app = FastAPI()
# Модель загружается один раз при старте
spam_classifier = pipeline(
"text-classification",
model="distilbert-base-uncased"
)
@app.post("/classify")
def classify_text(text: str):
# Inference происходит в процессе обработки запроса
result = spam_classifier(text)
return {
"label": result[0]["label"],
"score": result[0]["score"]
}
# Минусы: медленно (100-500ms на один inference)
# Плюсы: результат сразу
Вариант B: Асинхронный inference (очередь задач)
# ✅ Когда используем: результат можно получить позже
# ✅ Примеры: генерация отчёта, обработка батча, аналитика
# Архитектура
user_request → Queue → Worker → ML inference → Result storage
↑ ↓
└──── User polls for result ──────┘
from celery import Celery
from transformers import pipeline
app = Celery('ml_tasks')
model = pipeline("text-classification") # Загружается в worker'е
@app.task
def classify_text_async(text: str, task_id: str):
# Это выполняется в background worker'е
result = model(text)
# Сохраняем результат
cache.set(f"result:{task_id}", result, timeout=3600)
return result
# Из API
@app.post("/classify-async")
def classify_async(text: str):
task = classify_text_async.delay(text)
return {"task_id": task.id, "status_url": f"/status/{task.id}"}
@app.get("/status/{task_id}")
def get_status(task_id: str):
result = cache.get(f"result:{task_id}")
if result:
return {"status": "done", "result": result}
return {"status": "processing"}
# Плюсы: scalable, не блокирует основной процесс
# Минусы: пользователь ждёт результата
Вариант C: Batch inference (обработка в батчах)
# ✅ Когда используем: высокая throughput требуется
# ✅ Примеры: обработка исторических данных, night jobs
# Архитектура
Data source → Batch processor → Split into batches → Workers → Storage
from torch.utils.data import DataLoader, TensorDataset
import torch
def batch_inference(texts: List[str], batch_size: int = 32):
model = load_model()
# Создаём батчи
loader = DataLoader(
TensorDataset(torch.tensor(texts)),
batch_size=batch_size
)
results = []
with torch.no_grad():
for batch in loader:
# Inference на целом батче — быстрее!
output = model(batch)
results.extend(output)
return results
# Это 10-100x быстрее, чем inference по одному
# Но требует больше памяти
2. Управление моделями в production
Model Registry
# ✅ Где хранить модели? MLflow — стандартный choice
from mlflow.pyfunc import load_model
import mlflow
# Сохранить модель
mlflow.pytorch.log_model(model, "sentiment_model")
# Позже загрузить с версионированием
model = mlflow.pyfunc.load_model("models:/sentiment_model/production")
# MLflow позволяет:
# - Версионировать модели (model1, model2, model3)
# - Трекировать metrics (accuracy, F1, latency)
# - A/B тестировать разные версии
# - Управлять lifecycle (staging, production, archived)
Вычисление требуемых ресурсов
# Нужно понимать, сколько памяти и CPU требуется
model_resources = {
"BERT base": {
"memory": "350MB",
"inference_time": "50-100ms (single)",
"throughput": "10-20 samples/sec (single GPU)",
"hardware": "1x GPU (T4) или CPU"
},
"GPT-2 medium": {
"memory": "350MB",
"inference_time": "100-200ms",
"throughput": "5-10 samples/sec",
"hardware": "1x GPU"
},
"GPT-3 (via API)": {
"memory": "0 (external)",
"inference_time": "500ms-5sec",
"throughput": "Limited by API",
"cost": "$0.002 per 1K tokens"
}
}
# Для production нужно рассчитать
required_resources = {
"RPS": 1000, # requests per second
"avg_latency_requirement": "200ms", # SLA
"inference_time_per_sample": 100, # ms
"required_throughput": 1000 * (200 / 100), # 2000 samples/sec
"required_gpus": (2000 / 10), # 200 GPUs! (если 10 samples/sec на GPU)
"or_use_batch": "Батчирование снижает latency и требует меньше GPU"
}
3. Кеширование результатов ML
# ❌ Наивно: каждый раз вычисляем
@app.post("/predict")
def predict(user_id: int):
features = extract_features(user_id) # Дорого
prediction = model.predict(features) # Дорого
return {"prediction": prediction}
# ✅ Правильно: кешируем результаты
from functools import lru_cache
import hashlib
@app.post("/predict")
def predict(user_id: int):
cache_key = f"prediction:{user_id}"
# Проверяем кеш
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Если нет в кеше
features = extract_features(user_id)
prediction = model.predict(features)
# Сохраняем в кеш на 1 час
redis_client.setex(
cache_key,
3600,
json.dumps({"prediction": prediction})
)
return {"prediction": prediction}
# Это сокращает нагрузку на модель в 10-100x раз!
4. Мониторинг ML моделей
# ✅ Мониторинг production моделей
from prometheus_client import Counter, Histogram, Gauge
import time
# Метрики
inference_time = Histogram(
'ml_inference_duration_seconds',
'Time spent on ML inference'
)
inference_count = Counter(
'ml_inference_total',
'Total ML inferences',
['model', 'status'] # status: success, error, timeout
)
model_accuracy = Gauge(
'ml_model_accuracy',
'Model accuracy on validation set'
)
@app.post("/predict")
def predict(data):
start = time.time()
try:
result = model.predict(data)
duration = time.time() - start
inference_time.observe(duration)
inference_count.labels(
model="sentiment_v3",
status="success"
).inc()
# Алерт если медленно
if duration > 1.0:
logger.warning(f"Slow inference: {duration}s")
return {"prediction": result}
except Exception as e:
inference_count.labels(
model="sentiment_v3",
status="error"
).inc()
raise
# Важно мониторить:
# - Latency
# - Throughput
# - Error rate
# - Model drift (метрики меняются со временем)
5. Handling Model Drift
# ⚠️ Проблема: модель была обучена на старых данных
# Со временем реальные данные меняются, и модель деградирует
# ✅ Решение: мониторить accuracy на реальных данных
def monitor_model_drift(predictions, actual_labels):
# Периодически проверяем accuracy на реальных данных
accuracy = calculate_accuracy(predictions, actual_labels)
baseline_accuracy = 0.95 # Было при обучении
current_accuracy = accuracy
if current_accuracy < baseline_accuracy * 0.9:
# Accuracy упала на 10% — ALARM!
alert("Model drift detected!")
# Нужно переобучить модель
return {
"baseline": baseline_accuracy,
"current": current_accuracy,
"drift": baseline_accuracy - current_accuracy
}
# Процесс переобучения
def retrain_model():
# 1. Собрать новые данные за последний месяц
# 2. Переобучить модель
# 3. Валидировать на holdout set
# 4. A/B тестировать новую версию
# 5. Постепенно трафик к новой версии
# 6. Отпустить старую версию
pass
6. A/B тестирование моделей
# ✅ Как выпустить новую версию модели без риска
@app.post("/predict")
def predict(user_id: int):
# 10% трафика на новую версию
use_new_model = random.random() < 0.1
if use_new_model:
# Новая, более мощная модель
prediction = model_v3.predict(features)
model_version = "v3"
else:
# Старая, проверенная модель
prediction = model_v2.predict(features)
model_version = "v2"
# Логируем, какую модель использовали
log_prediction(
user_id=user_id,
prediction=prediction,
model_version=model_version
)
return {"prediction": prediction}
# Анализируем результаты
# v3: accuracy 0.96, latency 150ms, cost $0.003
# v2: accuracy 0.94, latency 100ms, cost $0.002
# Вердикт: v3 лучше, переходим на 100%
7. Пример полной pipeline
# Production-ready ML pipeline
from fastapi import FastAPI
from celery import Celery
import mlflow
from prometheus_client import Counter, Histogram
app = FastAPI()
celery_app = Celery('ml_app')
# Загружаем модель при старте
model = mlflow.pyfunc.load_model(
"models:/recommendation_model/production"
)
metrics = {
"inference_time": Histogram('inference_duration_ms'),
"inference_count": Counter('inference_total'),
}
@app.post("/recommend")
async def get_recommendations(user_id: int, limit: int = 10):
# Проверяем кеш
cache_key = f"recommendations:{user_id}:{limit}"
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Дорогая операция — в background
task = process_recommendation.delay(user_id, limit)
return {
"task_id": task.id,
"status_url": f"/recommendation-status/{task.id}"
}
@celery_app.task
def process_recommendation(user_id: int, limit: int):
import time
start = time.time()
try:
# Извлекаем features
features = fetch_user_features(user_id)
# Prediction
scores = model.predict(features)
recommendations = get_top_items(scores, limit)
duration = (time.time() - start) * 1000
metrics["inference_time"].observe(duration)
metrics["inference_count"].labels(status="success").inc()
# Кешируем результат
cache_key = f"recommendations:{user_id}:{limit}"
redis.setex(cache_key, 3600, json.dumps(recommendations))
return recommendations
except Exception as e:
metrics["inference_count"].labels(status="error").inc()
logger.exception(f"Recommendation failed for user {user_id}")
raise
Ключевые уроки
- Начните с простого — синхронный inference, потом оптимизируйте
- Меряйте всё — latency, accuracy, cost
- Кешируйте результаты — даже если модель медленная
- Мониторьте drift — модель деградирует со временем
- A/B тестируйте — новые версии постепенно
- Используйте батчирование — для throughput
- Версионируйте модели — всегда знайте, что в production
ML в production — это не только про model accuracy, но про инженерию!