← Назад к вопросам

ML System Design: Система обнаружения мошенничества

3.0 Senior🔥 191 комментариев
#MLOps и инфраструктура#Машинное обучение

Условие

Спроектируйте систему обнаружения мошеннических транзакций в реальном времени.

Опишите:

  1. Какие данные и признаки использовать
  2. Как справиться с редкостью мошенничества (imbalanced data)
  3. Какую модель выбрать
  4. Как организовать real-time inference
  5. Как измерить качество и business impact

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

ML System Design: Система обнаружения мошенничества

1. Признаки и данные

Основные источники данных:

  • Транзакционные данные:

    • amount (сумма)
    • merchant_category (категория магазина)
    • merchant_id
    • timestamp
    • location (география)
    • device_id
    • payment_method (карта, кошелёк, и т.д.)
  • Историческая информация пользователя:

    • Сумма транзакций в день/неделю/месяц
    • Количество транзакций в день/неделю/месяц
    • Средняя сумма транзакции
    • Стандартное отклонение сумм (волатильность)
    • Количество дней с активностью
    • Количество уникальных категорий магазинов
    • Количество уникальных географических точек
  • Гео-данные:

    • Расстояние между последовательными транзакциями
    • Время между транзакциями (должно быть достаточно для физического перемещения)
    • Скорость перемещения между точками (если >900 км/ч → мошенничество)
  • Поведенческие аномалии:

    • Z-score суммы (сколько σ от среднего)
    • Новая категория магазина для этого пользователя
    • Новое устройство
    • Новое географическое место
    • Первая транзакция после регистрации
    • Дневное время (обычно мошенничество ночью)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def engineer_features(transaction_df, user_history_df):
    """Feature engineering для детекции мошенничества"""
    
    features = pd.DataFrame()
    features['user_id'] = transaction_df['user_id']
    features['amount'] = transaction_df['amount']
    
    # 1. Базовые признаки
    features['amount_log'] = np.log1p(transaction_df['amount'])
    features['hour'] = pd.to_datetime(transaction_df['timestamp']).dt.hour
    features['day_of_week'] = pd.to_datetime(transaction_df['timestamp']).dt.dayofweek
    features['is_night'] = ((features['hour'] >= 22) | (features['hour'] <= 6)).astype(int)
    
    # 2. Z-score суммы (аномальность)
    for user_id, user_group in transaction_df.groupby('user_id'):
        user_mean = user_history_df.loc[user_id, 'avg_amount']
        user_std = user_history_df.loc[user_id, 'std_amount']
        features.loc[features['user_id'] == user_id, 'amount_zscore'] = \
            (transaction_df.loc[features['user_id'] == user_id, 'amount'] - user_mean) / (user_std + 1e-5)
    
    # 3. Историческая активность
    features['daily_txn_count'] = transaction_df.groupby('user_id').size()
    features['daily_txn_amount_sum'] = transaction_df.groupby('user_id')['amount'].sum()
    features['avg_amount'] = transaction_df.groupby('user_id')['amount'].mean()
    
    # 4. Гео-аномалии
    features['is_new_location'] = transaction_df.groupby('user_id')['location'].transform(
        lambda x: x.apply(lambda loc: loc not in user_history_df.loc[x.name, 'known_locations'])
    )
    
    # 5. Устройство
    features['is_new_device'] = transaction_df['device_id'].apply(
        lambda x: x not in user_history_df.get(x, {}).get('known_devices', [])
    )
    
    # 6. Скорость перемещения
    features['impossible_travel'] = 0  # Если расстояние > 900 км за < 1 час
    
    return features

2. Работа с дисбалансом данных (Imbalanced Data)

Проблема: мошенничество составляет 0.1-1% от всех транзакций.

Подход 1: Переэмпиризация (SMOTE)

from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline

smote = SMOTE(sampling_strategy=0.3, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

Подход 2: Взвешивание классов

from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight('balanced', 
                                     classes=np.unique(y_train), 
                                     y=y_train)
weight_dict = {0: class_weights[0], 1: class_weights[1]}

# В XGBoost
model = XGBClassifier(scale_pos_weight=class_weights[1]/class_weights[0])

Подход 3: Threshold optimization

# По умолчанию XGBoost предсказывает класс если P(fraud) > 0.5
# Но для мошенничества часто нужен более низкий порог

from sklearn.metrics import f1_score
import numpy as np

thresholds = np.arange(0.01, 1, 0.01)
best_threshold = 0.5
best_f1 = 0

for thresh in thresholds:
    y_pred = (model.predict_proba(X_val)[:, 1] >= thresh).astype(int)
    f1 = f1_score(y_val, y_pred)
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = thresh

Подход 4: Cost-sensitive learning

# Штрафуем false negatives (пропущенные мошенничества) в 100 раз больше
# чем false positives (ложные срабатывания)

class CostSensitiveModel:
    def fit(self, X, y, cost_matrix=None):
        # cost_matrix[0, 1] = стоимость false positive
        # cost_matrix[1, 0] = стоимость false negative
        if cost_matrix is None:
            cost_matrix = [[0, 1], [100, 0]]  # FN в 100 раз дороже FP
        self.model.fit(X, y, sample_weight=self._compute_weights(y, cost_matrix))

3. Выбор модели

Требования:

  • Быстрый inference (< 50ms на 1 транзакцию)
  • Вероятностные предсказания (нужны уверенности)
  • Объяснимость (почему заблокирована транзакция?)
  • Обновление модели (новые мошенничества появляются постоянно)

Вариант A: XGBoost (Gradient Boosting)

from xgboost import XGBClassifier

model = XGBClassifier(
    max_depth=7,
    learning_rate=0.1,
    n_estimators=100,
    scale_pos_weight=99,  # Дисбаланс 1:99
    objective='binary:logistic',
    eval_metric='auc',
    random_state=42
)

model.fit(X_train, y_train, 
         eval_set=[(X_val, y_val)],
         early_stopping_rounds=10)

# Feature importance
feature_importance = model.get_booster().get_score()

Плюсы:

  • Быстро обучается и делает inference
  • Хорошо работает с дисбалансом
  • Объяснимость через SHAP
  • Может обновляться (online learning)

Вариант B: Логистическая регрессия (baseline)

from sklearn.linear_model import LogisticRegression

model = LogisticRegression(
    class_weight='balanced',
    max_iter=1000,
    solver='lbfgs'
)

Плюсы:

  • Очень простая, быстрая
  • Максимальная объяснимость (коэффициенты)
  • Легко интерпретировать

Минусы:

  • Может быть менее точной на сложных паттернах

Вариант C: Neural Network (если много данных > 1M транзакций)

from tensorflow.keras import Sequential, layers

model = Sequential([
    layers.Dense(128, activation='relu', input_dim=50),
    layers.BatchNormalization(),
    layers.Dropout(0.3),
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.3),
    layers.Dense(32, activation='relu'),
    layers.Dense(1, activation='sigmoid')
])

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['auc']
)

Рекомендация: XGBoost — оптимален по скорости/качеству/простоте.

4. Real-Time Inference архитектура

User Transaction → API Gateway → Feature Store → ML Model → Decision → Action
                                                    ↓
                                              Redis Cache (для истории)
                                              Database (логирование)

Архитектура:

import redis
from fastapi import FastAPI
import numpy as np
import joblib

app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379)
model = joblib.load('fraud_model.pkl')
feature_scaler = joblib.load('scaler.pkl')

# Cache для истории пользователя
CACHE_KEY_PREFIX = "user:"
CACHE_TTL = 3600

@app.post("/predict")
async def predict_fraud(transaction: dict):
    """Real-time предсказание мошенничества"""
    
    user_id = transaction['user_id']
    
    # 1. Получаем историю пользователя из кеша (O(1) операция)
    user_history = redis_client.get(f"{CACHE_KEY_PREFIX}{user_id}")
    if not user_history:
        # Если нет в кеше — берём из БД
        user_history = fetch_from_database(user_id)
        redis_client.setex(f"{CACHE_KEY_PREFIX}{user_id}", CACHE_TTL, user_history)
    
    # 2. Проектируем признаки
    features = engineer_single_transaction(transaction, user_history)
    features_scaled = feature_scaler.transform([features])
    
    # 3. Предсказание
    fraud_probability = model.predict_proba(features_scaled)[0][1]
    
    # 4. Принимаем решение
    THRESHOLD = 0.7  # Порог блокировки
    is_fraud = fraud_probability > THRESHOLD
    
    # 5. Логируем для аудита
    log_prediction(user_id, transaction, fraud_probability, is_fraud)
    
    return {
        "user_id": user_id,
        "transaction_id": transaction['id'],
        "fraud_score": fraud_probability,
        "is_fraud": is_fraud,
        "action": "BLOCK" if is_fraud else "APPROVE"
    }

def engineer_single_transaction(transaction, user_history):
    """Проектирование признаков для одной транзакции"""
    features = []
    features.append(np.log1p(transaction['amount']))
    features.append(transaction['hour'])
    features.append((transaction['amount'] - user_history['avg_amount']) / (user_history['std_amount'] + 1e-5))
    # ... остальные признаки
    return features

Оптимизация latency:

# 1. Используем модель меньше размером (distillation)
# 2. Feature caching в Redis
# 3. Batch inference для некритичных операций
# 4. Quantization модели (int8 вместо float32)

import onnx
import onnxruntime

# Конвертируем XGBoost в ONNX для быстрого inference
onnx_model = convert(model, 'xgboost', initial_types=[('float_input', FloatTensorType([None, 50]))])
sess = onnxruntime.InferenceSession('model.onnx')
predictions = sess.run(None, {'float_input': features})

5. Метрики и оценка качества

Классические метрики:

from sklearn.metrics import (
    confusion_matrix, precision_score, recall_score, 
    f1_score, roc_auc_score, auc, roc_curve
)

y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

# Матрица ошибок
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()

print(f"TP (правильно поймали мошенничество): {tp}")
print(f"FP (ложный алерт): {fp}")
print(f"FN (пропустили мошенничество): {fn}")
print(f"TN (правильно пропустили): {tn}")

# Metrics
precision = tp / (tp + fp)  # Из блокированных — сколько реально мошенников
recall = tp / (tp + fn)     # Из всех мошенников — сколько поймали
f1 = 2 * (precision * recall) / (precision + recall)

print(f"Precision: {precision:.3f}")  # Важна для пользовательского опыта
print(f"Recall: {recall:.3f}")         # Важна для безопасности
print(f"F1 Score: {f1:.3f}")

# AUC-ROC
auc_roc = roc_auc_score(y_test, y_pred_proba)
print(f"AUC-ROC: {auc_roc:.3f}")

Business метрики:

# Стоимость одного мошенничества
FRAUD_LOSS_PER_TRANSACTION = 100  # USD

# Стоимость ложного алерта (забаненный легальный пользователь может уйти)
FALSE_POSITIVE_COST = 50  # USD (потеря пользователя)

total_loss_without_model = len(fraud_transactions) * FRAUD_LOSS_PER_TRANSACTION

total_loss_with_model = (
    fn * FRAUD_LOSS_PER_TRANSACTION +  # Пропущенные мошенничества
    fp * FALSE_POSITIVE_COST           # Ложные блокировки
)

saved_money = total_loss_without_model - total_loss_with_model
roi = saved_money / model_deployment_cost

Мониторинг качества:

# Performance degrade detection
# Переобучение модели если:
# 1. AUC упал на 5% за неделю
# 2. Recall упал на 10% (пропускаем больше мошенничеств)

class ModelMonitor:
    def __init__(self, baseline_auc=0.95):
        self.baseline_auc = baseline_auc
    
    def check_model_health(self, y_true, y_pred_proba):
        current_auc = roc_auc_score(y_true, y_pred_proba)
        
        if current_auc < self.baseline_auc * 0.95:
            # Запускаем переобучение
            trigger_retraining()
        
        return current_auc

6. Continuous Learning и Feedback Loop

# Каждый день переобучаем модель на новых мошенничествах
def daily_model_retraining():
    # 1. Получаем события за последние 24 часа
    recent_transactions = fetch_recent_transactions(hours=24)
    
    # 2. Берём те, что выявлены как мошенничество
    fraud_labels = fetch_verified_fraud_labels()
    
    # 3. Переобучаем
    X_new = engineer_features(recent_transactions)
    model.fit(X_new, fraud_labels)
    
    # 4. Оцениваем на тестовом наборе
    auc_new = evaluate_model()
    
    if auc_new > current_auc:
        deploy_new_model()
    else:
        log_warning("New model performs worse")

Итоговая архитектура

  1. Данные: трансакционные + историческая информация + гео-данные
  2. Признаки: 50-100 features, основаны на anomaly detection
  3. Дисбаланс: SMOTE + class_weight + threshold optimization
  4. Модель: XGBoost (best speed/quality tradeoff)
  5. Inference: Redis cache + FastAPI + <50ms latency
  6. Метрики: Precision/Recall/AUC-ROC + Business ROI
  7. Мониторинг: Daily retraining, drift detection, feedback loop

Production Checklist:

  • Logging всех предсказаний для audit trail
  • A/B тестирование новых версий модели
  • Explainability (SHAP) для каждого решения
  • Fallback стратегия (manual review для high-uncertainty)
  • Regular penetration testing (мошенники адаптируются)
ML System Design: Система обнаружения мошенничества | PrepBro