Clustering and k-Means
import sys, os, re
import time # For timing
import numpy as np
import matplotlib.pyplot as plt
import cv2
k-Means
def show_kmeans(points, K, figsize=(10, 10)):
criteria = (cv2.TERM_CRITERIA_MAX_ITER, 100, 1.0)
rets, labels, centers = cv2.kmeans(points, K, None, criteria, 1, cv2.KMEANS_RANDOM_CENTERS)
fig, ax = plt.subplots(figsize=figsize)
print(labels.shape)
colors = [
((0.6, 0.0, 0.0), (1.0, 0.0, 0.0)),
((0.0, 0.6, 0.0), (0.0, 1.0, 0.0)),
((0.0, 0.0, 0.6), (0.0, 0.0, 1.0)),
((0.6, 0.6, 0.0), (1.0, 1.0, 0.0)),
((0.0, 0.6, 0.6), (0.0, 1.0, 1.0)),
((0.6, 0.0, 0.6), (1.0, 0.0, 1.0)),
]
for cluster in range(K):
ax.scatter(points[labels[:,0]==cluster,0],
points[labels[:,0]==cluster,1], c=[colors[cluster][0]])
ax.scatter(centers[cluster,0], centers[cluster,1], c=[colors[cluster][1]], s=100, marker='x')
ax.scatter(centers[cluster,0], centers[cluster,1], s=150, marker='o', facecolors='none', edgecolors=[colors[cluster][1]])
2 Clean Clusters
np.random.seed(10)
n_points = 200
points_2_clean_centers = np.concatenate([
np.random.randn(n_points//2, 2)*0.4 - 1,
np.random.randn(n_points//2, 2)*0.4 + 1,
]).astype(np.float32)
K = 2
show_kmeans(points_2_clean_centers, K)
One Large Cluster
np.random.seed(10)
n_points = 800
circle = np.random.randn(n_points, 2)
circle = (circle/np.sqrt(np.sum(circle**2, 1)[:,None])) * np.sqrt(np.random.rand(n_points,1))
points_1_large = (circle @ np.diag([1, 2])).astype(np.float32)
cv2.setRNGSeed(10)
K = 4
show_kmeans(points_1_large, K, figsize=(10, 5))
cv2.setRNGSeed(51)
K = 4
show_kmeans(points_1_large, K, figsize=(10, 5))
5 Clusters, 3 Centers
np.random.seed(10)
n_points = 800
points_5_centers = np.concatenate([
np.random.randn(n_points//5, 2)*0.4 + (3, 2),
np.random.randn(n_points//5, 2)*0.4 + (1, -2),
np.random.randn(n_points//5, 2)*0.4 + (-2, -2),
np.random.randn(n_points//5, 2)*0.4 + (-3, 1),
np.random.randn(n_points//5, 2)*0.4 + (0, 4),
]).astype(np.float32)
cv2.setRNGSeed(10)
K = 3
show_kmeans(points_5_centers, K)
cv2.setRNGSeed(51)
K = 3
show_kmeans(points_5_centers, K)
Outliers
np.random.seed(10)
n_points = 200
points_2_centers_outlier = np.concatenate([
np.random.randn(n_points//2, 2)*0.4 - (1, 0),
np.random.randn(n_points//2, 2)*0.4 + (1, 0),
[(0, 5)],
]).astype(np.float32)
cv2.setRNGSeed(10)
K = 3
show_kmeans(points_2_centers_outlier, K)
cv2.setRNGSeed(183)
K = 2
show_kmeans(points_2_centers_outlier, K)
cv2.setRNGSeed(10)
K = 2
show_kmeans(points_2_centers_outlier, K)
Visual Bag of Words
########################################################################
#
# Original File: vbow_demo.py
# Author: Matt Zucker
# Adapted by Stephen Phillips
# Date: 2021, updated 2022
#
# Written for ENGR 27 - Computer Vision
#
########################################################################
# Constants
#
# Shows how to do Visual Bag of Words classification similar to
#
# Varma, Manik, and Andrew Zisserman. "A statistical approach to
# texture classification from single images." International journal
# of computer vision 62.1-2 (2005): 61-81.
#
# One important omission: Varma & Zisserman use responses to filter
# banks as the "words" whereas we are using individual subimages.
#
# The input images are letters rendered in three different fonts.
# size of input images
IMAGE_SIZE = 48
# size of extracted tiles
TILE_SIZE = 6
# amount to advance in x/y when sampling tiles
TILE_STRIDE = 1
# total count of pixels in tile
TILE_PIXELS = TILE_SIZE * TILE_SIZE
# bounds on tile means to get "interesting" tiles (i.e. not just solid
# white or black)
TILE_MEAN_MIN = 50
TILE_MEAN_MAX = 255-TILE_MEAN_MIN
# k for k-means
KMEANS_K = 30
# other params for cv2.kmeans
KMEANS_CRIT = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10000, 1.0)
KMEANS_ATTEMPTS = 5
KMEANS_FLAGS = cv2.KMEANS_RANDOM_CENTERS
# should we merge tiles from different labels before clustering?
MERGE_BEFORE_CLUSTER = False
# try one of these methods for comparing histograms
HIST_METHOD = cv2.HISTCMP_HELLINGER
#HIST_METHOD = cv2.HISTCMP_CHISQR
#HIST_METHOD = cv2.HISTCMP_INTERSECT
#HIST_METHOD = cv2.HISTCMP_CORREL
# convert font name to label idx 0,1,2
LABEL_FROM_STR = dict(courier=0, helvetica=1, times=2)
# number of classes
NUM_CLASSES = len(LABEL_FROM_STR)
# we have k cluster centers per class
NUM_CENTERS = KMEANS_K * NUM_CLASSES
# hard code some directory names
TRAIN_DIR = '/work/train'
TEST_DIR = '/work/test'
# just for controlling visualization
PREVIEW_SCL = 4
PREVIEW_TILE_COLS = 10 # 39
PREVIEW_TILE_ROWS = 10 # 22
PREVIEW_TILE_GAP = 2
PREVIEW_HIST_COLS = 5
PREVIEW_HIST_ROWS = 12
PREVIEW_HIST_GAP = 8
WINDOW_NAME = 'Visual bag of words'
######################################################################
# a simple timer class
class Timer(object):
def __init__(self, status):
self.status = status
print(self.status + '...')
def __enter__(self):
self.start_time = time.perf_counter()
def __exit__(self, type, value, traceback):
elapsed = time.perf_counter() - self.start_time
print('finished {} in {:.3f} seconds\n'.format(
self.status, elapsed))
######################################################################
# for each point in query_points return the index of the closest
# point in train_points
def bruteforce_match(train_points, query_points):
m = len(train_points)
n = len(query_points)
k = train_points.shape[1]
assert query_points.shape[1] == train_points.shape[1]
train_points = train_points[:, None, :]
query_points = query_points[None, :, :]
diff = train_points - query_points
d = (diff**2).sum(axis=2)
idx = d.argmin(axis=0)
return idx
######################################################################
# extract subimage tiles from orignal image of given size, stride
def extract_tiles(image, size, stride=(1,1)):
tiles = []
h, w = image.shape[:2]
sx, sy = size
dx, dy = stride
if h < sy or w < sx:
return tiles
if dx <= 0: dx = 1
if dy <= 0: dy = 1
ox = (sx-dx)
oy = (sy-dy)
nx = (w - ox) // dx
ny = (h - oy) // dy
fx = nx*dx + ox
fy = ny*dy + oy
x0 = (w-fx)//2
y0 = (h-fy)//2
for ty in range(ny):
y = y0 + dy*ty
for tx in range(nx):
x = x0 + dx*tx
tiles.append( image[y:y+sy, x:x+sx].copy() )
return tiles
######################################################################
# for filtering directory listings
def is_image(filename):
_, ext = os.path.splitext(filename.lower())
return ext in ('.png', '.jpg')
######################################################################
# return sorted list of images in directory
def images_in_dir(dirname):
filenames = os.listdir(dirname)
filenames = [ os.path.join(dirname, n) for n in filenames ]
filenames = list(filter(is_image, filenames))
filenames.sort()
return filenames
######################################################################
# return the label 0,1,2 for the given filename
def label_for_file(filename):
filename = os.path.basename(filename)
lstr = filename[:filename.find('_')]
return LABEL_FROM_STR[lstr]
######################################################################
# convert tiles from image format (n, h, w) to vector format (n, h*w)
def tile_img_to_vec(tiles):
assert len(tiles.shape) == 3 and tiles.shape[1:] == (TILE_SIZE, TILE_SIZE)
return tiles.reshape(-1, TILE_PIXELS).astype(np.float32)
######################################################################
# convert tiles from vector format (n, h*w) to image format (n, h, w)
def tile_vec_to_img(tiles):
assert len(tiles.shape) == 2 and tiles.shape[1] == TILE_PIXELS
return np.clip(tiles, 0, 255).astype(np.uint8).reshape(-1, TILE_SIZE, TILE_SIZE)
######################################################################
# for filtering lists of tiles (see below)
def is_interesting_tile(tile):
m = tile.mean()
return m >= TILE_MEAN_MIN and m <= TILE_MEAN_MAX
######################################################################
# get label and extract tiles for a given image file
def tiles_label_for_file(filename):
label = label_for_file(filename)
image = cv2.imread(filename)
if len(image.shape) > 2:
image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
assert image.shape == (IMAGE_SIZE, IMAGE_SIZE)
tiles = extract_tiles(image,
(TILE_SIZE, TILE_SIZE),
(TILE_STRIDE, TILE_STRIDE))
tiles = list(filter(is_interesting_tile, tiles))
tiles = tile_img_to_vec(np.array(tiles))
return tiles, label
######################################################################
# get labels and tiles for a set of files.
# each tile will get its own corresponding label.
def tiles_labels_for_files(filenames):
all_tiles = []
all_labels = []
for filename in filenames:
tiles, label = tiles_label_for_file(filename)
all_tiles.append(tiles)
all_labels += [label for tile in tiles]
all_tiles = np.vstack(tuple(all_tiles))
all_labels = np.array(all_labels, dtype=np.uint8)
return all_tiles, all_labels
######################################################################
# assemble a mosaic image from regular subimages for visualization
def mosaic(tiles, nrows, ncols, gap, scl):
assert(len(tiles) == nrows * ncols)
th, tw = tiles[0].shape[:2]
h = nrows * (th + gap) + gap
w = ncols * (tw + gap) + gap
if len(tiles[0].shape) == 3:
shape = (h, w, 3)
else:
shape = (h, w)
preview = 127*np.ones(shape, dtype=np.uint8)
counter = 0
for row in range(nrows):
y0 = row*(th+gap)+gap
for col in range(ncols):
x0 = col*(tw+gap)+gap
tile = tiles[counter]
counter += 1
preview[y0:y0+th, x0:x0+tw] = tile
preview = cv2.resize(preview, (scl*w, scl*h),
interpolation=cv2.INTER_NEAREST)
return preview
######################################################################
# used for visualization
def preview_images(caption, images, ncols, nrows=0,
gap=0, scl=1, shuffle=False):
nimages = len(images)
assert nimages >= ncols
if nrows <= 0:
nrows = nimages//ncols
else:
nrows = min(nimages//ncols, nrows)
npreview = nrows * ncols
if shuffle:
idx = np.arange(nimages)
np.random.shuffle(idx)
images = images[idx[:npreview]]
else:
images = images[:npreview]
preview = mosaic(images, nrows, ncols, gap, scl)
h = 16
if len(preview.shape) == 3:
shape = (h, preview.shape[1], 3)
else:
shape = (h, preview.shape[1])
caption_image = 127*np.ones(shape, dtype=np.uint8)
preview = np.vstack((caption_image, preview))
cv2.putText(preview, caption, (5, 14),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255),
1, cv2.LINE_AA)
plt.figure(figsize=(20, 20))
if len(preview.shape) == 3:
plt.imshow(preview)
else:
plt.imshow(preview, cmap='gray')
######################################################################
# used for visualization
def preview_tiles(caption, tiles, ncols, nrows=0, shuffle=False):
images = tile_vec_to_img(tiles)
preview_images(caption, images, ncols, nrows,
PREVIEW_TILE_GAP, PREVIEW_SCL, shuffle)
######################################################################
# run k-means clustering on a set of tiles, either for each label
# or all merged together depending on MERGE_BEFORE_CLUSTER
def cluster_tiles(tiles, labels_per_tile):
k = KMEANS_K
if MERGE_BEFORE_CLUSTER:
labels_per_tile = np.zeros_like(labels_per_tile)
k *= NUM_CLASSES
lrange = range(labels_per_tile.max()+1)
all_centers = []
for label in lrange:
mask = (labels_per_tile == label)
label_tiles = tiles[mask]
nlabel = label_tiles.shape[0]
sz = label_tiles.shape[1:]
print('got {} tiles for label {}'.format(nlabel, label))
_, _, centers = cv2.kmeans(label_tiles, k, None,
KMEANS_CRIT, KMEANS_ATTEMPTS, KMEANS_FLAGS)
all_centers.append( centers )
all_centers = np.vstack(tuple(all_centers))
return all_centers
######################################################################
# given a set of cluster centers and a set of tiles, make a histogram
# tabulating frequency of each cluster center (i.e. value of bin i is
# proportional to the number of tiles for which cluster center i was
# closest)
def make_histogram(centers, tiles):
assert len(centers.shape) == 2 and centers.shape[1] == TILE_PIXELS
assert len(tiles.shape) == 2 and tiles.shape[1] == TILE_PIXELS
idx = bruteforce_match(centers, tiles)
histogram = np.bincount(idx, minlength=NUM_CENTERS).astype(np.float32)
return histogram / histogram.sum()
######################################################################
# extract a histogram given a filename
def histogram_for_file(centers, filename):
tiles, label = tiles_label_for_file(filename)
return make_histogram(centers, tiles)
######################################################################
# used only for visualization
def draw_histogram(height, histogram):
nhist = len(histogram)
s = 2
while (2*height > s*nhist-1): s += 1
himage = 220 * np.ones( (height, s*nhist-1, 3), dtype='uint8' )
hmax = np.max(histogram)
for c in range(nhist):
label = c // KMEANS_K
assert label < NUM_CLASSES
color = np.zeros(3, dtype=np.uint8)
if MERGE_BEFORE_CLUSTER:
color = (191, 0, 191)
else:
color[2-label] = 220
hc = histogram[c] / hmax
y = int(hc*height)
if (y):
startcol = s*c
endcol = s*(c+1)-1
color = np.array(color).reshape( (1,1,3) )
himage[0:y, startcol:endcol, :] = color
cv2.flip(himage, 0, himage)
return himage
######################################################################
# extract a set of histograms given filenames. optionally, also return
# display images for visualization
def histograms_for_files(centers, filenames, visualize=False):
histograms = np.array([ histogram_for_file(centers, f) for f in filenames ])
if not visualize:
return histograms
display_images = []
for filename, histogram in zip(filenames, histograms):
image = cv2.imread(filename)
himage = draw_histogram(image.shape[0], histogram)
vgap = 127*np.ones((image.shape[0], 2, 3), dtype=np.uint8)
display_images.append( np.hstack((image, vgap, himage)) )
display_images = np.array( display_images )
return histograms, display_images
######################################################################
# returns the index of the closest matching training histogram to the
# given test histogram.
def closest_histogram(train_histograms, test_histogram):
dists = np.zeros(len(train_histograms))
for idx, train_histogram in enumerate(train_histograms):
dists[idx] = cv2.compareHist(test_histogram, train_histogram,
HIST_METHOD)
if HIST_METHOD in [cv2.HISTCMP_HELLINGER, cv2.HISTCMP_CHISQR]:
return dists.argmin()
else:
return dists.argmax()
######################################################################
# validate a set of test images given learned cluster centers and
# labeled training set histograms.
def validate(centers,
train_filenames,
train_histograms,
test_filenames, visualize=False):
display_images = []
total_correct = 0
for test_filename in test_filenames:
true_label = label_for_file(test_filename)
test_histogram = histogram_for_file(centers, test_filename)
closest_idx = closest_histogram(train_histograms, test_histogram)
train_filename = train_filenames[closest_idx]
pred_label = label_for_file(train_filename)
train_histogram = train_histograms[closest_idx]
correct = (pred_label == true_label)
total_correct += correct
if not visualize:
continue
test_image = cv2.imread(test_filename)
train_image = cv2.imread(train_filename)
if correct:
cv2.line(train_image, (2, 7), (4, 10),
(0, 191, 0), 1, cv2.LINE_AA)
cv2.line(train_image, (4, 10), (10, 2),
(0, 191, 0), 1, cv2.LINE_AA)
else:
cv2.line(train_image, (2, 2), (10, 10),
(0, 0, 191), 1, cv2.LINE_AA)
cv2.line(train_image, (2, 10), (10, 2),
(0, 0, 191), 1, cv2.LINE_AA)
test_himage = draw_histogram(test_image.shape[0], test_histogram)
train_himage = draw_histogram(train_image.shape[0], train_histogram)
vgap = 127*np.ones((test_image.shape[0], 2, 3), dtype=np.uint8)
display1 = np.hstack((test_image, vgap, test_himage))
hgap = 127*np.ones((2, display1.shape[1], 3),dtype=np.uint8)
display2 = np.hstack((train_image, vgap, train_himage))
display_image = np.vstack((display1, hgap, display2))
display_images.append(display_image)
error_rate = 1.0 - float(total_correct)/len(test_filenames)
if not visualize:
return total_correct, error_rate
else:
display_images = np.array(display_images)
return total_correct, error_rate, display_images
######################################################################
def main():
train_filenames = images_in_dir(TRAIN_DIR)
test_filenames = images_in_dir(TEST_DIR)
with Timer('extracting training tiles/labels') as t:
train_tiles, train_labels_per_tile = tiles_labels_for_files(train_filenames)
print('got {} training tiles'.format(len(train_tiles)))
with Timer('running k-means') as t:
centers = cluster_tiles(train_tiles, train_labels_per_tile)
print('centers is {} of type {}'.format(centers.shape, centers.dtype))
with Timer('computing histograms of training set') as t:
train_histograms, train_display = histograms_for_files(centers, train_filenames,
visualize=True)
print('histograms is {} of type {}'.format(
train_histograms.shape, train_histograms.dtype))
with Timer('validating on test set') as t:
total_correct, error_rate, validate_display = validate(centers,
train_filenames,
train_histograms,
test_filenames,
visualize=True)
print('{}/{} correct (error rate={:.3f})'.format(
total_correct, len(test_filenames), error_rate))
preview_tiles('Sample tiles from training set',
train_tiles,
PREVIEW_TILE_COLS, PREVIEW_TILE_ROWS,
shuffle=True)
preview_tiles('Centers from k-means clustering',
centers, KMEANS_K)
preview_images('Example image/histogram pairs from training set',
train_display,
PREVIEW_HIST_COLS, PREVIEW_HIST_ROWS,
shuffle=True, gap=PREVIEW_HIST_GAP)
preview_images('Top of pair: test img/hist; ' +
'bottom: nearest training example',
validate_display,
PREVIEW_HIST_COLS, PREVIEW_HIST_ROWS//2,
shuffle=False, gap=PREVIEW_HIST_GAP)
if __name__ == '__main__':
main()