← Назад к вопросам

Python: Реализовать k-means clustering

2.0 Middle🔥 131 комментариев
#Python#Машинное обучение

Условие

Реализуйте алгоритм k-means кластеризации с нуля на Python (без использования sklearn).

Требования:

  1. Инициализация центроидов
  2. Итеративное обновление кластеров
  3. Критерий останова
  4. Тестирование на синтетических данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Python: Реализовать k-means clustering

Полная реализация k-means

import numpy as np
from typing import Tuple, List
import matplotlib.pyplot as plt


class KMeans:
    """K-means кластеризация с нуля"""
    
    def __init__(self, k: int = 3, max_iterations: int = 100, 
                 random_state: int = 42, tolerance: float = 1e-4):
        """
        Args:
            k: количество кластеров
            max_iterations: максимум итераций
            random_state: seed для воспроизводимости
            tolerance: порог для остановки (изменение центроидов)
        """
        self.k = k
        self.max_iterations = max_iterations
        self.random_state = random_state
        self.tolerance = tolerance
        self.centroids = None
        self.labels = None
        self.inertia_history = []
        
    def _initialize_centroids(self, X: np.ndarray) -> np.ndarray:
        """
        Инициализация центроидов случайным выбором из точек данных
        (K-means++ может быть еще лучше, но это простая версия)
        """
        np.random.seed(self.random_state)
        random_indices = np.random.choice(X.shape[0], self.k, replace=False)
        return X[random_indices].copy()
    
    def _euclidean_distance(self, x: np.ndarray, 
                           centroid: np.ndarray) -> float:
        """Евклидово расстояние между точкой и центроидом"""
        return np.sqrt(np.sum((x - centroid) ** 2))
    
    def _assign_clusters(self, X: np.ndarray) -> np.ndarray:
        """Присвоение каждой точки ближайшему кластеру"""
        distances = np.zeros((X.shape[0], self.k))
        
        for i in range(self.k):
            for j in range(X.shape[0]):
                distances[j, i] = self._euclidean_distance(X[j], 
                                                           self.centroids[i])
        
        return np.argmin(distances, axis=1)
    
    def _update_centroids(self, X: np.ndarray, 
                         labels: np.ndarray) -> Tuple[np.ndarray, float]:
        """
        Обновление центроидов как среднее арифметическое точек в кластере
        Возвращает новые центроиды и максимальное изменение
        """
        new_centroids = np.zeros((self.k, X.shape[1]))
        max_shift = 0.0
        
        for i in range(self.k):
            # Все точки, принадлежащие кластеру i
            cluster_points = X[labels == i]
            
            if len(cluster_points) > 0:
                new_centroid = cluster_points.mean(axis=0)
            else:
                # Если кластер пустой - переинициализировать случайно
                new_centroid = X[np.random.choice(X.shape[0])]
            
            # Вычислить сдвиг центроида
            shift = self._euclidean_distance(self.centroids[i], new_centroid)
            max_shift = max(max_shift, shift)
            
            new_centroids[i] = new_centroid
        
        return new_centroids, max_shift
    
    def _calculate_inertia(self, X: np.ndarray, 
                          labels: np.ndarray) -> float:
        """
        Inertia = сумма квадратов расстояний до ближайшего центроида
        (используется для оценки качества кластеризации)
        """
        inertia = 0.0
        for i in range(self.k):
            cluster_points = X[labels == i]
            if len(cluster_points) > 0:
                distances = np.sum((cluster_points - self.centroids[i]) ** 2)
                inertia += distances
        return inertia
    
    def fit(self, X: np.ndarray) -> 'KMeans':
        """
        Обучение модели на данных
        
        Args:
            X: матрица признаков (n_samples, n_features)
        """
        # 1. Инициализация центроидов
        self.centroids = self._initialize_centroids(X)
        
        # 2. Итеративное обновление
        for iteration in range(self.max_iterations):
            # Шаг 1: присвоение точек кластерам
            self.labels = self._assign_clusters(X)
            
            # Шаг 2: обновление центроидов
            new_centroids, max_shift = self._update_centroids(X, self.labels)
            
            # Запись inertia для отслеживания сходимости
            inertia = self._calculate_inertia(X, self.labels)
            self.inertia_history.append(inertia)
            
            # Проверка критерия остановки
            if max_shift < self.tolerance:
                print(f"Сходимость достигнута на итерации {iteration + 1}")
                break
            
            self.centroids = new_centroids
            
            if (iteration + 1) % 10 == 0:
                print(f"Итерация {iteration + 1}: Inertia = {inertia:.4f}, "
                      f"max_shift = {max_shift:.6f}")
        
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """Предсказание кластеров для новых данных"""
        return self._assign_clusters(X)
    
    def fit_predict(self, X: np.ndarray) -> np.ndarray:
        """Обучение и предсказание в одном вызове"""
        self.fit(X)
        return self.labels


# Тестирование на синтетических данных
def test_kmeans():
    """Генерирование синтетических данных и тестирование"""
    
    # 1. Генерация синтетических данных (3 кластера)
    np.random.seed(42)
    
    cluster1 = np.random.randn(100, 2) + [2, 2]
    cluster2 = np.random.randn(100, 2) + [-2, -2]
    cluster3 = np.random.randn(100, 2) + [2, -2]
    
    X = np.vstack([cluster1, cluster2, cluster3])
    
    # Истинные метки (для проверки)
    y_true = np.array([0]*100 + [1]*100 + [2]*100)
    
    # 2. Обучение k-means
    print("=== K-Means Clustering ===")
    kmeans = KMeans(k=3, max_iterations=100, tolerance=1e-4)
    labels = kmeans.fit_predict(X)
    
    # 3. Оценка качества
    print(f"\nЦентроиды:\n{kmeans.centroids}")
    print(f"\nFinal Inertia: {kmeans.inertia_history[-1]:.4f}")
    
    # Silhouette Score для оценки качества
    silhouette = calculate_silhouette_score(X, labels)
    print(f"Silhouette Score: {silhouette:.4f}")
    
    # 4. Визуализация
    visualize_clusters(X, labels, kmeans.centroids)


def calculate_silhouette_score(X: np.ndarray, labels: np.ndarray) -> float:
    """
    Silhouette Score = средняя мера, как хорошо точка подходит своему кластеру
    Значения от -1 до 1 (выше - лучше)
    """
    n_samples = X.shape[0]
    silhouette_scores = []
    
    for i in range(n_samples):
        # a_i: среднее расстояние до других точек в том же кластере
        same_cluster_points = X[labels == labels[i]]
        if len(same_cluster_points) > 1:
            a_i = np.mean(np.linalg.norm(same_cluster_points - X[i], axis=1))
        else:
            a_i = 0
        
        # b_i: минимальное среднее расстояние до точек в других кластерах
        b_i = np.inf
        for k in np.unique(labels):
            if k != labels[i]:
                other_cluster_points = X[labels == k]
                dist_to_other = np.mean(np.linalg.norm(
                    other_cluster_points - X[i], axis=1))
                b_i = min(b_i, dist_to_other)
        
        # Silhouette coefficient
        if max(a_i, b_i) > 0:
            s_i = (b_i - a_i) / max(a_i, b_i)
        else:
            s_i = 0
        
        silhouette_scores.append(s_i)
    
    return np.mean(silhouette_scores)


def visualize_clusters(X: np.ndarray, labels: np.ndarray, 
                      centroids: np.ndarray):
    """Визуализация кластеров"""
    plt.figure(figsize=(10, 8))
    
    # Раскрашивание точек по кластерам
    colors = ['red', 'green', 'blue', 'yellow', 'purple']
    for i in range(len(np.unique(labels))):
        cluster_points = X[labels == i]
        plt.scatter(cluster_points[:, 0], cluster_points[:, 1], 
                   c=colors[i % len(colors)], label=f'Cluster {i}', 
                   alpha=0.6, s=50)
    
    # Отображение центроидов
    plt.scatter(centroids[:, 0], centroids[:, 1], c='black', 
               marker='*', s=500, edgecolors='white', linewidths=2,
               label='Centroids')
    
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.title('K-Means Clustering Result')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()


# Альтернативная инициализация: K-means++
def kmeans_plusplus_init(X: np.ndarray, k: int, 
                        random_state: int = 42) -> np.ndarray:
    """
    K-means++ инициализация для лучшей сходимости
    Первый центроид выбирается случайно,
    последующие - с вероятностью пропорциональной расстоянию
    """
    np.random.seed(random_state)
    n_samples = X.shape[0]
    centroids = []
    
    # 1. Первый центроид - случайная точка
    first_idx = np.random.choice(n_samples)
    centroids.append(X[first_idx].copy())
    
    # 2. Остальные k-1 центроидов
    for _ in range(1, k):
        # Расстояния от каждой точки до ближайшего центроида
        distances = np.array([
            min(np.linalg.norm(x - c) for c in centroids)
            for x in X
        ])
        
        # Вероятность выбора пропорциональна квадрату расстояния
        probabilities = distances ** 2
        probabilities /= probabilities.sum()
        
        # Выбор нового центроида с этой вероятностью
        next_idx = np.random.choice(n_samples, p=probabilities)
        centroids.append(X[next_idx].copy())
    
    return np.array(centroids)


if __name__ == "__main__":
    test_kmeans()

Основные компоненты:

1. Инициализация (init):

  • Случайный выбор k точек из данных
  • K-means++ для лучшей сходимости (опционально)

2. Итеративное обновление:

  • Assign: присвоение каждой точки ближайшему центроиду (Euclidean distance)
  • Update: пересчет центроидов как среднее кластера
  • Повтор до сходимости

3. Критерий останова:

  • Максимальное изменение центроидов < tolerance
  • Или максимум итераций достигнут

4. Качество:

  • Inertia (сумма квадратов внутри-кластерных расстояний) - должна убывать
  • Silhouette Score - мера качества кластеризации (-1 до 1)
  • Визуализация

Сложность:

  • Время: O(n * k * d * i) где n=samples, k=clusters, d=features, i=iterations
  • Память: O(n * d)

Улучшения:

  1. K-means++ инициализация (быстрее сходится)
  2. Mini-batch K-means для больших данных
  3. Использование KD-tree для быстрого поиска расстояний
  4. Параллелизация вычисления расстояний
Python: Реализовать k-means clustering | PrepBro