import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sys
from collections import Counter
import time
class Kmeans:
"""
Kmeans clustering
Parameters:
-----------
X: input data
K: number of clusters to generate
n_init: number of times the k-means algorithm is run with different centroid seeds
"""
def __init__(self, K, n_init):
self.K = K
self.n_init = n_init
def fit(self, X):
if isinstance(X, pd.DataFrame):
X = X.to_numpy()
elif isinstance(X, list):
X = np.array(X)
else:
pass
self.centroids, self.labels = self.kmeans(X, self.K, self.n_init)
def fit_predict(self, X):
self.centroids, self.labels = self.kmeans(X, self.K, self.n_init)
return self.labels
def predict(self, X):
dist = np.array([np.linalg.norm(X-centroid, axis=1) for centroid in self.centroids])
labels = np.argmin(dist, axis=0)
return labels
def kmeans(self, X, K, n_init):
"""
Kmeans clustering algorithm
Parameters:
-----------
X: input data
K: number of clusters to generate
n_init: number of times the k-means algorithm is run with different centroid seeds
Returs:
-------
centroids: the best centroids chosen output of n_init consecutive runs in terms of SSE (Sum of Squared Errors)
labels: the cluster index for each data point
"""
labels_list = []
centroids_list = []
sse_list = []
max_iter = 1000
for i in range(n_init):
labels = np.empty(len(X))
centroids = self.init_centroids(X, K)
for j in range(max_iter):
new_labels = self.reassign(K,X, centroids)
new_centroids = self.calc_centers(K,X,new_labels)
if np.all(new_labels == labels):
break
else:
labels = new_labels.astype(int)
centroids = new_centroids
labels_list.append(labels)
centroids_list.append(centroids)
sse = self.calc_SSE(X, centroids, labels)
sse_list.append(sse)
centroids = centroids_list[np.argmin(sse_list)]
labels = labels_list[np.argmin(sse_list)]
return centroids, labels
def init_centroids(self, X, K):
"""
Initializes the centroids based on the kmeans++ algorithm
Parameters:
-----------
X: input data
K: number of centroids to generate
Returs:
-------
centroids: a list of as many centroids as K
"""
m, n = X.shape
centroids = np.zeros((K, n))
centroids[0] = X[np.random.randint(X.shape[0]),:]
for c_i in range(K-1):
dist_mat = np.empty((len(X),len(centroids)))
for j in range(len(centroids)):
dist_mat[:,j] = np.sqrt(np.sum((X-centroids[j])**2,axis=1))
dist_min = np.min(dist_mat, axis=1)
next_centroid = X[np.argmax(dist_min)]
centroids[c_i+1]= next_centroid
return centroids
def calc_centers(self, K, X, labels):
"""
Calculate new centroids
Parameters:
-----------
K: number of centroids to generate
X: input data
labels: the cluster index for each data point
Returs:
-------
centroids: newly calculated centroids
"""
dim = np.shape(X)[1]
centroids = np.empty((K, dim))
for clidx in range(K):
labels = np.array(labels)
keylist = np.where(labels == clidx)
centroids[clidx] = X[keylist].mean(axis=0)
return centroids
def reassign(self, K, X, centroids):
"""
Reassigns new labels based on the new centroids
Parameters:
-----------
K: number of centroids to generate
X: input data
centroids: the cluster centroids
Returs:
-------
labels: newly assigned labels
"""
labels = np.empty(len(X))
scores = np.empty((len(X),len(centroids)))
for clidx in range(K):
scores[:,clidx] = np.sum((X-centroids[clidx])**2, axis=1)
labels = scores.argmin(axis=1)
return labels
def calc_SSE(self, X, centroids, labels):
"""
Calculate the SSE (Sum of Squared Errors)
Parameters:
-----------
K: number of centroids to generate
centroids: the cluster centroids
labels: the cluster index for each data point
Returs:
-------
sse: SSE (Sum of Squared Errors)
"""
sse = 0
X = np.array(X)
labels = np.array(labels).astype(int)
for i in range(len(centroids)):
X_tmp = X[np.where(labels==i)]
sse += np.sum(np.sum((X_tmp-centroids[i])**2,axis=1))
return sse
# creating data
mean_01 = np.array([0.0, 1.0])
cov_01 = np.array([[1, 0.3], [0.3, 1]])
dist_01 = np.random.multivariate_normal(mean_01, cov_01, 100)
mean_02 = np.array([7.0, 7.0])
cov_02 = np.array([[1.5, 0.3], [0.3, 1]])
dist_02 = np.random.multivariate_normal(mean_02, cov_02, 100)
mean_03 = np.array([7.5, -5.0])
cov_03 = np.array([[1.2, 0.5], [0.5, 1.3]])
dist_03 = np.random.multivariate_normal(mean_03, cov_01, 100)
mean_04 = np.array([1.0, -7.0])
cov_04 = np.array([[1.2, 0.5], [0.3, 1.3]])
dist_04 = np.random.multivariate_normal(mean_04, cov_01, 100)
data = np.vstack((dist_01, dist_02, dist_03, dist_04))
np.random.shuffle(data)
plt.figure(figsize=(6,4))
plt.scatter(data[:,0], data[:,1])
kmeans = Kmeans(K=4, n_init=10)
kmeans.fit(data)
centroids = kmeans.centroids
labels = kmeans.labels
plt.scatter(data[np.where(labels==0)][:,0],data[np.where(labels==0)][:,1], color = "lightblue", label="cluster 1")
plt.scatter(data[np.where(labels==1)][:,0],data[np.where(labels==1)][:,1], color = "khaki", label="cluster 2")
plt.scatter(data[np.where(labels==2)][:,0],data[np.where(labels==2)][:,1], color = "lightgreen", label="cluster 3")
plt.scatter(data[np.where(labels==3)][:,0],data[np.where(labels==3)][:,1], color = "lightpink",label="cluster 4")
plt.scatter(centroids[:, 0], centroids[:, 1], color = "red", label="final centroids")
plt.legend(bbox_to_anchor=(1, 1), fontsize=12)
plt.show()
def calc_SSE(x, centroids, labels):
sse = 0
x = np.array(x)
labels = np.array(labels).astype(int)
for i in range(len(centroids)):
x_tmp = x[np.where(labels==i)]
sse += np.sum(np.sum((x_tmp-centroids[i])**2,axis=1))
return sse
def calc_silhouette(x, labels):
labels = np.array(labels)
cluster_size = Counter(labels)
cluster_list = cluster_size.keys()
score_list = []
for i in range(len(x)):
main = x[i]
label = labels[i]
rest = np.concatenate((x[:i],x[i+1:]))
rest_labels = np.concatenate((labels[:i],labels[i+1:]))
score = 0
within_dist = 0
between_dist_list = []
for cluster in cluster_list:
neighbor_points = rest[np.where(rest_labels==cluster)]
if cluster == label:
# calculate within-cluster distance
size = cluster_size[cluster]
# calculate pairwise euclidian distance between the main point and its neighbors
within_dist = np.sum(np.sqrt(np.sum((neighbor_points-main)**2,axis=1)))
within_dist = within_dist/(size-1)
else:
# calculate between-cluster distance
tmp_btw_dist = 0
size = cluster_size[cluster]
# calculate pairwise euclidian distance between the main point and its neighbors
tmp_btw_dist = np.sum(np.sqrt(np.sum((neighbor_points-main)**2, axis=1)))
tmp_btw_dist = tmp_btw_dist/size
between_dist_list.append(tmp_btw_dist)
between_dist = min(between_dist_list)
score = (between_dist-within_dist)/max(between_dist, within_dist)
score_list.append(score)
silhouette_score = sum(score_list)/len(x)
return silhouette_score
# calculate SSE and Silhouette score for kmeans with k varying from 2 to 8
sse_list = []
silhouette_list= []
k_range = range(2,9)
for k in k_range:
kmeans = Kmeans(K=k, n_init=10)
labels = kmeans.fit_predict(data)
centroids = kmeans.centroids
sse_list.append(calc_SSE(data, centroids, labels))
silhouette_list.append(calc_silhouette(data, labels))
# plot SSE and Silhouette scores
fig, axes = plt.subplots(1,2, figsize=(14,4))
axes[0].plot(sse_list, marker="x")
axes[0].set_xticks(ticks=np.arange(0,len(k_range)),labels=k_range)
axes[0].set_title("Elbow Method for Optimal K")
axes[0].set_ylabel("Sum of Squared Error (SSE)")
axes[0].set_xlabel("The Value of K")
axes[1].plot(silhouette_list, marker="x")
axes[1].set_xticks(ticks=np.arange(0,len(k_range)),labels=k_range)
axes[1].set_title("Silhouette Analysis for Optimal K")
axes[1].set_ylabel("Silhouette Score")
axes[1].set_xlabel("The Value of K")
plt.show()
img_path = "Peko1_.png"
image_original_1 = plt.imread(img_path)
plt.axes(xticks=[], yticks=[])
plt.imshow(image_original_1)
plt.title('Original Image')
plt.show()
rows = image_original_1.shape[0]
cols = image_original_1.shape[1]
image_1 = image_original_1.reshape(rows*cols, 3)
compressed_images = []
k_list = [4, 8, 16, 32]
for k in k_list:
kmeans = Kmeans(K=k, n_init=10)
labels = kmeans.fit_predict(image_1)
centroids = kmeans.centroids
compressed_image = (np.clip(centroids[labels], 0, 1)).reshape(rows, cols, 3)
compressed_images.append(compressed_image)
fig, axes = plt.subplots(2,2, figsize=(16,12))
for i in range(4):
row = i//2
col = i%2
axes[row,col].imshow(compressed_images[i])
axes[row,col].set_title("k={}".format(k_list[i]), fontsize=20)
axes[row,col].set_xticks([])
axes[row,col].set_yticks([])
plt.show()