ML System Design: Система обнаружения мошенничества
Условие
Спроектируйте систему обнаружения мошеннических транзакций в реальном времени.
Опишите:
- Какие данные и признаки использовать
- Как справиться с редкостью мошенничества (imbalanced data)
- Какую модель выбрать
- Как организовать real-time inference
- Как измерить качество и business impact
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
ML System Design: Система обнаружения мошенничества
1. Признаки и данные
Основные источники данных:
-
Транзакционные данные:
- amount (сумма)
- merchant_category (категория магазина)
- merchant_id
- timestamp
- location (география)
- device_id
- payment_method (карта, кошелёк, и т.д.)
-
Историческая информация пользователя:
- Сумма транзакций в день/неделю/месяц
- Количество транзакций в день/неделю/месяц
- Средняя сумма транзакции
- Стандартное отклонение сумм (волатильность)
- Количество дней с активностью
- Количество уникальных категорий магазинов
- Количество уникальных географических точек
-
Гео-данные:
- Расстояние между последовательными транзакциями
- Время между транзакциями (должно быть достаточно для физического перемещения)
- Скорость перемещения между точками (если >900 км/ч → мошенничество)
-
Поведенческие аномалии:
- Z-score суммы (сколько σ от среднего)
- Новая категория магазина для этого пользователя
- Новое устройство
- Новое географическое место
- Первая транзакция после регистрации
- Дневное время (обычно мошенничество ночью)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def engineer_features(transaction_df, user_history_df):
"""Feature engineering для детекции мошенничества"""
features = pd.DataFrame()
features['user_id'] = transaction_df['user_id']
features['amount'] = transaction_df['amount']
# 1. Базовые признаки
features['amount_log'] = np.log1p(transaction_df['amount'])
features['hour'] = pd.to_datetime(transaction_df['timestamp']).dt.hour
features['day_of_week'] = pd.to_datetime(transaction_df['timestamp']).dt.dayofweek
features['is_night'] = ((features['hour'] >= 22) | (features['hour'] <= 6)).astype(int)
# 2. Z-score суммы (аномальность)
for user_id, user_group in transaction_df.groupby('user_id'):
user_mean = user_history_df.loc[user_id, 'avg_amount']
user_std = user_history_df.loc[user_id, 'std_amount']
features.loc[features['user_id'] == user_id, 'amount_zscore'] = \
(transaction_df.loc[features['user_id'] == user_id, 'amount'] - user_mean) / (user_std + 1e-5)
# 3. Историческая активность
features['daily_txn_count'] = transaction_df.groupby('user_id').size()
features['daily_txn_amount_sum'] = transaction_df.groupby('user_id')['amount'].sum()
features['avg_amount'] = transaction_df.groupby('user_id')['amount'].mean()
# 4. Гео-аномалии
features['is_new_location'] = transaction_df.groupby('user_id')['location'].transform(
lambda x: x.apply(lambda loc: loc not in user_history_df.loc[x.name, 'known_locations'])
)
# 5. Устройство
features['is_new_device'] = transaction_df['device_id'].apply(
lambda x: x not in user_history_df.get(x, {}).get('known_devices', [])
)
# 6. Скорость перемещения
features['impossible_travel'] = 0 # Если расстояние > 900 км за < 1 час
return features
2. Работа с дисбалансом данных (Imbalanced Data)
Проблема: мошенничество составляет 0.1-1% от всех транзакций.
Подход 1: Переэмпиризация (SMOTE)
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
smote = SMOTE(sampling_strategy=0.3, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
Подход 2: Взвешивание классов
from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight('balanced',
classes=np.unique(y_train),
y=y_train)
weight_dict = {0: class_weights[0], 1: class_weights[1]}
# В XGBoost
model = XGBClassifier(scale_pos_weight=class_weights[1]/class_weights[0])
Подход 3: Threshold optimization
# По умолчанию XGBoost предсказывает класс если P(fraud) > 0.5
# Но для мошенничества часто нужен более низкий порог
from sklearn.metrics import f1_score
import numpy as np
thresholds = np.arange(0.01, 1, 0.01)
best_threshold = 0.5
best_f1 = 0
for thresh in thresholds:
y_pred = (model.predict_proba(X_val)[:, 1] >= thresh).astype(int)
f1 = f1_score(y_val, y_pred)
if f1 > best_f1:
best_f1 = f1
best_threshold = thresh
Подход 4: Cost-sensitive learning
# Штрафуем false negatives (пропущенные мошенничества) в 100 раз больше
# чем false positives (ложные срабатывания)
class CostSensitiveModel:
def fit(self, X, y, cost_matrix=None):
# cost_matrix[0, 1] = стоимость false positive
# cost_matrix[1, 0] = стоимость false negative
if cost_matrix is None:
cost_matrix = [[0, 1], [100, 0]] # FN в 100 раз дороже FP
self.model.fit(X, y, sample_weight=self._compute_weights(y, cost_matrix))
3. Выбор модели
Требования:
- Быстрый inference (< 50ms на 1 транзакцию)
- Вероятностные предсказания (нужны уверенности)
- Объяснимость (почему заблокирована транзакция?)
- Обновление модели (новые мошенничества появляются постоянно)
Вариант A: XGBoost (Gradient Boosting)
from xgboost import XGBClassifier
model = XGBClassifier(
max_depth=7,
learning_rate=0.1,
n_estimators=100,
scale_pos_weight=99, # Дисбаланс 1:99
objective='binary:logistic',
eval_metric='auc',
random_state=42
)
model.fit(X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=10)
# Feature importance
feature_importance = model.get_booster().get_score()
Плюсы:
- Быстро обучается и делает inference
- Хорошо работает с дисбалансом
- Объяснимость через SHAP
- Может обновляться (online learning)
Вариант B: Логистическая регрессия (baseline)
from sklearn.linear_model import LogisticRegression
model = LogisticRegression(
class_weight='balanced',
max_iter=1000,
solver='lbfgs'
)
Плюсы:
- Очень простая, быстрая
- Максимальная объяснимость (коэффициенты)
- Легко интерпретировать
Минусы:
- Может быть менее точной на сложных паттернах
Вариант C: Neural Network (если много данных > 1M транзакций)
from tensorflow.keras import Sequential, layers
model = Sequential([
layers.Dense(128, activation='relu', input_dim=50),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['auc']
)
Рекомендация: XGBoost — оптимален по скорости/качеству/простоте.
4. Real-Time Inference архитектура
User Transaction → API Gateway → Feature Store → ML Model → Decision → Action
↓
Redis Cache (для истории)
Database (логирование)
Архитектура:
import redis
from fastapi import FastAPI
import numpy as np
import joblib
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379)
model = joblib.load('fraud_model.pkl')
feature_scaler = joblib.load('scaler.pkl')
# Cache для истории пользователя
CACHE_KEY_PREFIX = "user:"
CACHE_TTL = 3600
@app.post("/predict")
async def predict_fraud(transaction: dict):
"""Real-time предсказание мошенничества"""
user_id = transaction['user_id']
# 1. Получаем историю пользователя из кеша (O(1) операция)
user_history = redis_client.get(f"{CACHE_KEY_PREFIX}{user_id}")
if not user_history:
# Если нет в кеше — берём из БД
user_history = fetch_from_database(user_id)
redis_client.setex(f"{CACHE_KEY_PREFIX}{user_id}", CACHE_TTL, user_history)
# 2. Проектируем признаки
features = engineer_single_transaction(transaction, user_history)
features_scaled = feature_scaler.transform([features])
# 3. Предсказание
fraud_probability = model.predict_proba(features_scaled)[0][1]
# 4. Принимаем решение
THRESHOLD = 0.7 # Порог блокировки
is_fraud = fraud_probability > THRESHOLD
# 5. Логируем для аудита
log_prediction(user_id, transaction, fraud_probability, is_fraud)
return {
"user_id": user_id,
"transaction_id": transaction['id'],
"fraud_score": fraud_probability,
"is_fraud": is_fraud,
"action": "BLOCK" if is_fraud else "APPROVE"
}
def engineer_single_transaction(transaction, user_history):
"""Проектирование признаков для одной транзакции"""
features = []
features.append(np.log1p(transaction['amount']))
features.append(transaction['hour'])
features.append((transaction['amount'] - user_history['avg_amount']) / (user_history['std_amount'] + 1e-5))
# ... остальные признаки
return features
Оптимизация latency:
# 1. Используем модель меньше размером (distillation)
# 2. Feature caching в Redis
# 3. Batch inference для некритичных операций
# 4. Quantization модели (int8 вместо float32)
import onnx
import onnxruntime
# Конвертируем XGBoost в ONNX для быстрого inference
onnx_model = convert(model, 'xgboost', initial_types=[('float_input', FloatTensorType([None, 50]))])
sess = onnxruntime.InferenceSession('model.onnx')
predictions = sess.run(None, {'float_input': features})
5. Метрики и оценка качества
Классические метрики:
from sklearn.metrics import (
confusion_matrix, precision_score, recall_score,
f1_score, roc_auc_score, auc, roc_curve
)
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Матрица ошибок
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
print(f"TP (правильно поймали мошенничество): {tp}")
print(f"FP (ложный алерт): {fp}")
print(f"FN (пропустили мошенничество): {fn}")
print(f"TN (правильно пропустили): {tn}")
# Metrics
precision = tp / (tp + fp) # Из блокированных — сколько реально мошенников
recall = tp / (tp + fn) # Из всех мошенников — сколько поймали
f1 = 2 * (precision * recall) / (precision + recall)
print(f"Precision: {precision:.3f}") # Важна для пользовательского опыта
print(f"Recall: {recall:.3f}") # Важна для безопасности
print(f"F1 Score: {f1:.3f}")
# AUC-ROC
auc_roc = roc_auc_score(y_test, y_pred_proba)
print(f"AUC-ROC: {auc_roc:.3f}")
Business метрики:
# Стоимость одного мошенничества
FRAUD_LOSS_PER_TRANSACTION = 100 # USD
# Стоимость ложного алерта (забаненный легальный пользователь может уйти)
FALSE_POSITIVE_COST = 50 # USD (потеря пользователя)
total_loss_without_model = len(fraud_transactions) * FRAUD_LOSS_PER_TRANSACTION
total_loss_with_model = (
fn * FRAUD_LOSS_PER_TRANSACTION + # Пропущенные мошенничества
fp * FALSE_POSITIVE_COST # Ложные блокировки
)
saved_money = total_loss_without_model - total_loss_with_model
roi = saved_money / model_deployment_cost
Мониторинг качества:
# Performance degrade detection
# Переобучение модели если:
# 1. AUC упал на 5% за неделю
# 2. Recall упал на 10% (пропускаем больше мошенничеств)
class ModelMonitor:
def __init__(self, baseline_auc=0.95):
self.baseline_auc = baseline_auc
def check_model_health(self, y_true, y_pred_proba):
current_auc = roc_auc_score(y_true, y_pred_proba)
if current_auc < self.baseline_auc * 0.95:
# Запускаем переобучение
trigger_retraining()
return current_auc
6. Continuous Learning и Feedback Loop
# Каждый день переобучаем модель на новых мошенничествах
def daily_model_retraining():
# 1. Получаем события за последние 24 часа
recent_transactions = fetch_recent_transactions(hours=24)
# 2. Берём те, что выявлены как мошенничество
fraud_labels = fetch_verified_fraud_labels()
# 3. Переобучаем
X_new = engineer_features(recent_transactions)
model.fit(X_new, fraud_labels)
# 4. Оцениваем на тестовом наборе
auc_new = evaluate_model()
if auc_new > current_auc:
deploy_new_model()
else:
log_warning("New model performs worse")
Итоговая архитектура
- Данные: трансакционные + историческая информация + гео-данные
- Признаки: 50-100 features, основаны на anomaly detection
- Дисбаланс: SMOTE + class_weight + threshold optimization
- Модель: XGBoost (best speed/quality tradeoff)
- Inference: Redis cache + FastAPI + <50ms latency
- Метрики: Precision/Recall/AUC-ROC + Business ROI
- Мониторинг: Daily retraining, drift detection, feedback loop
Production Checklist:
- Logging всех предсказаний для audit trail
- A/B тестирование новых версий модели
- Explainability (SHAP) для каждого решения
- Fallback стратегия (manual review для high-uncertainty)
- Regular penetration testing (мошенники адаптируются)