← Назад к вопросам
Python: Реализовать k-means clustering
2.0 Middle🔥 131 комментариев
#Python#Машинное обучение
Условие
Реализуйте алгоритм k-means кластеризации с нуля на Python (без использования sklearn).
Требования:
- Инициализация центроидов
- Итеративное обновление кластеров
- Критерий останова
- Тестирование на синтетических данных
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Python: Реализовать k-means clustering
Полная реализация k-means
import numpy as np
from typing import Tuple, List
import matplotlib.pyplot as plt
class KMeans:
"""K-means кластеризация с нуля"""
def __init__(self, k: int = 3, max_iterations: int = 100,
random_state: int = 42, tolerance: float = 1e-4):
"""
Args:
k: количество кластеров
max_iterations: максимум итераций
random_state: seed для воспроизводимости
tolerance: порог для остановки (изменение центроидов)
"""
self.k = k
self.max_iterations = max_iterations
self.random_state = random_state
self.tolerance = tolerance
self.centroids = None
self.labels = None
self.inertia_history = []
def _initialize_centroids(self, X: np.ndarray) -> np.ndarray:
"""
Инициализация центроидов случайным выбором из точек данных
(K-means++ может быть еще лучше, но это простая версия)
"""
np.random.seed(self.random_state)
random_indices = np.random.choice(X.shape[0], self.k, replace=False)
return X[random_indices].copy()
def _euclidean_distance(self, x: np.ndarray,
centroid: np.ndarray) -> float:
"""Евклидово расстояние между точкой и центроидом"""
return np.sqrt(np.sum((x - centroid) ** 2))
def _assign_clusters(self, X: np.ndarray) -> np.ndarray:
"""Присвоение каждой точки ближайшему кластеру"""
distances = np.zeros((X.shape[0], self.k))
for i in range(self.k):
for j in range(X.shape[0]):
distances[j, i] = self._euclidean_distance(X[j],
self.centroids[i])
return np.argmin(distances, axis=1)
def _update_centroids(self, X: np.ndarray,
labels: np.ndarray) -> Tuple[np.ndarray, float]:
"""
Обновление центроидов как среднее арифметическое точек в кластере
Возвращает новые центроиды и максимальное изменение
"""
new_centroids = np.zeros((self.k, X.shape[1]))
max_shift = 0.0
for i in range(self.k):
# Все точки, принадлежащие кластеру i
cluster_points = X[labels == i]
if len(cluster_points) > 0:
new_centroid = cluster_points.mean(axis=0)
else:
# Если кластер пустой - переинициализировать случайно
new_centroid = X[np.random.choice(X.shape[0])]
# Вычислить сдвиг центроида
shift = self._euclidean_distance(self.centroids[i], new_centroid)
max_shift = max(max_shift, shift)
new_centroids[i] = new_centroid
return new_centroids, max_shift
def _calculate_inertia(self, X: np.ndarray,
labels: np.ndarray) -> float:
"""
Inertia = сумма квадратов расстояний до ближайшего центроида
(используется для оценки качества кластеризации)
"""
inertia = 0.0
for i in range(self.k):
cluster_points = X[labels == i]
if len(cluster_points) > 0:
distances = np.sum((cluster_points - self.centroids[i]) ** 2)
inertia += distances
return inertia
def fit(self, X: np.ndarray) -> 'KMeans':
"""
Обучение модели на данных
Args:
X: матрица признаков (n_samples, n_features)
"""
# 1. Инициализация центроидов
self.centroids = self._initialize_centroids(X)
# 2. Итеративное обновление
for iteration in range(self.max_iterations):
# Шаг 1: присвоение точек кластерам
self.labels = self._assign_clusters(X)
# Шаг 2: обновление центроидов
new_centroids, max_shift = self._update_centroids(X, self.labels)
# Запись inertia для отслеживания сходимости
inertia = self._calculate_inertia(X, self.labels)
self.inertia_history.append(inertia)
# Проверка критерия остановки
if max_shift < self.tolerance:
print(f"Сходимость достигнута на итерации {iteration + 1}")
break
self.centroids = new_centroids
if (iteration + 1) % 10 == 0:
print(f"Итерация {iteration + 1}: Inertia = {inertia:.4f}, "
f"max_shift = {max_shift:.6f}")
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Предсказание кластеров для новых данных"""
return self._assign_clusters(X)
def fit_predict(self, X: np.ndarray) -> np.ndarray:
"""Обучение и предсказание в одном вызове"""
self.fit(X)
return self.labels
# Тестирование на синтетических данных
def test_kmeans():
"""Генерирование синтетических данных и тестирование"""
# 1. Генерация синтетических данных (3 кластера)
np.random.seed(42)
cluster1 = np.random.randn(100, 2) + [2, 2]
cluster2 = np.random.randn(100, 2) + [-2, -2]
cluster3 = np.random.randn(100, 2) + [2, -2]
X = np.vstack([cluster1, cluster2, cluster3])
# Истинные метки (для проверки)
y_true = np.array([0]*100 + [1]*100 + [2]*100)
# 2. Обучение k-means
print("=== K-Means Clustering ===")
kmeans = KMeans(k=3, max_iterations=100, tolerance=1e-4)
labels = kmeans.fit_predict(X)
# 3. Оценка качества
print(f"\nЦентроиды:\n{kmeans.centroids}")
print(f"\nFinal Inertia: {kmeans.inertia_history[-1]:.4f}")
# Silhouette Score для оценки качества
silhouette = calculate_silhouette_score(X, labels)
print(f"Silhouette Score: {silhouette:.4f}")
# 4. Визуализация
visualize_clusters(X, labels, kmeans.centroids)
def calculate_silhouette_score(X: np.ndarray, labels: np.ndarray) -> float:
"""
Silhouette Score = средняя мера, как хорошо точка подходит своему кластеру
Значения от -1 до 1 (выше - лучше)
"""
n_samples = X.shape[0]
silhouette_scores = []
for i in range(n_samples):
# a_i: среднее расстояние до других точек в том же кластере
same_cluster_points = X[labels == labels[i]]
if len(same_cluster_points) > 1:
a_i = np.mean(np.linalg.norm(same_cluster_points - X[i], axis=1))
else:
a_i = 0
# b_i: минимальное среднее расстояние до точек в других кластерах
b_i = np.inf
for k in np.unique(labels):
if k != labels[i]:
other_cluster_points = X[labels == k]
dist_to_other = np.mean(np.linalg.norm(
other_cluster_points - X[i], axis=1))
b_i = min(b_i, dist_to_other)
# Silhouette coefficient
if max(a_i, b_i) > 0:
s_i = (b_i - a_i) / max(a_i, b_i)
else:
s_i = 0
silhouette_scores.append(s_i)
return np.mean(silhouette_scores)
def visualize_clusters(X: np.ndarray, labels: np.ndarray,
centroids: np.ndarray):
"""Визуализация кластеров"""
plt.figure(figsize=(10, 8))
# Раскрашивание точек по кластерам
colors = ['red', 'green', 'blue', 'yellow', 'purple']
for i in range(len(np.unique(labels))):
cluster_points = X[labels == i]
plt.scatter(cluster_points[:, 0], cluster_points[:, 1],
c=colors[i % len(colors)], label=f'Cluster {i}',
alpha=0.6, s=50)
# Отображение центроидов
plt.scatter(centroids[:, 0], centroids[:, 1], c='black',
marker='*', s=500, edgecolors='white', linewidths=2,
label='Centroids')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('K-Means Clustering Result')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# Альтернативная инициализация: K-means++
def kmeans_plusplus_init(X: np.ndarray, k: int,
random_state: int = 42) -> np.ndarray:
"""
K-means++ инициализация для лучшей сходимости
Первый центроид выбирается случайно,
последующие - с вероятностью пропорциональной расстоянию
"""
np.random.seed(random_state)
n_samples = X.shape[0]
centroids = []
# 1. Первый центроид - случайная точка
first_idx = np.random.choice(n_samples)
centroids.append(X[first_idx].copy())
# 2. Остальные k-1 центроидов
for _ in range(1, k):
# Расстояния от каждой точки до ближайшего центроида
distances = np.array([
min(np.linalg.norm(x - c) for c in centroids)
for x in X
])
# Вероятность выбора пропорциональна квадрату расстояния
probabilities = distances ** 2
probabilities /= probabilities.sum()
# Выбор нового центроида с этой вероятностью
next_idx = np.random.choice(n_samples, p=probabilities)
centroids.append(X[next_idx].copy())
return np.array(centroids)
if __name__ == "__main__":
test_kmeans()
Основные компоненты:
1. Инициализация (init):
- Случайный выбор k точек из данных
- K-means++ для лучшей сходимости (опционально)
2. Итеративное обновление:
- Assign: присвоение каждой точки ближайшему центроиду (Euclidean distance)
- Update: пересчет центроидов как среднее кластера
- Повтор до сходимости
3. Критерий останова:
- Максимальное изменение центроидов < tolerance
- Или максимум итераций достигнут
4. Качество:
- Inertia (сумма квадратов внутри-кластерных расстояний) - должна убывать
- Silhouette Score - мера качества кластеризации (-1 до 1)
- Визуализация
Сложность:
- Время: O(n * k * d * i) где n=samples, k=clusters, d=features, i=iterations
- Память: O(n * d)
Улучшения:
- K-means++ инициализация (быстрее сходится)
- Mini-batch K-means для больших данных
- Использование KD-tree для быстрого поиска расстояний
- Параллелизация вычисления расстояний