Non-parametric/Unsupervised Learning
import numpy as np
import cv2
# For timing
import time
# For plotting
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from matplotlib.colors import ListedColormap
# For KNN
from sklearn import neighbors, datasets
# For MNIST
import tensorflow as tf
from tensorflow import keras
Iris Data
# Iris dataset
raw_data = np.loadtxt("/work/data/iris.csv", delimiter=",")
column_headers = ["Sepal Length (cm)", "Sepal width (cm)", "Petal Length (cm)", "Petal Width (cm)", "Iris Type"]
df = pd.DataFrame(raw_data, columns=column_headers)
iris_types = ["Setosa", "Versicolor", "Virginica"]
Pairwise Scatter Plot
sns.pairplot(df, hue="Iris Type")
KNN Visualization
n_neighbors
/ 10
# Code adapted from: https://pythonspot.com/k-nearest-neighbors/
X, y = (raw_data[:, [0, 3]], raw_data[:, -1])
# Create color maps
cmap_light = ListedColormap(['#FFAAAA', '#AAFFAA','#00AAFF'])
cmap_bold = ListedColormap(['#FF0000', '#00FF00','#00FFFF'])
# we create an instance of Neighbours Classifier and fit the data.
clf = neighbors.KNeighborsClassifier(n_neighbors, weights='distance')
clf.fit(X, y)
# Visualize
dx, dy = 0.02, 0.02
# calculate min, max and limits
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, dx),
np.arange(y_min, y_max, dy))
# predict class using data and kNN classifier
Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
# Put the result into a color plot
Z = Z.reshape(xx.shape)
plt.figure(figsize=(8,8))
plt.pcolormesh(xx, yy, Z, cmap=cmap_light)
# Plot also the training points
plt.scatter(X[:, 0], X[:, 1], c=y, s=20, cmap=cmap_bold)
plt.xlim(xx.min(), xx.max())
plt.ylim(yy.min(), yy.max())
plt.title("3-Class classification (k = %i)" % (n_neighbors))
plt.xlabel(column_headers[0])
plt.ylabel(column_headers[3])
MNIST KNN
pca_k
/ 100
knn_k
/ 10
########################################################################
#
# File:   mnist_pca_knn.py
# Author: Matt Zucker
# Date:   March 2021
#
# Written for ENGR 27 - Computer Vision
#
########################################################################
#
# Shows how to do kNN classification (plus optional PCA) on MNIST
# dataset.
import os, sys
import time
MNIST_IMAGE_SIZE = 28
MNIST_DIMS = 784
TARGET_DISPLAY_SIZE = 340
BATCH_SIZE = 500
######################################################################
# Download and parse MNIST data
def get_mnist_data():
    # keras has mnist loader built in!
    (train_images, train_labels), (test_images, test_labels) = keras.datasets.mnist.load_data()
    return train_labels, train_images, test_labels, test_images
######################################################################
# For majority voting step of k nearest neighbors
# https://stackoverflow.com/questions/19201972/can-numpy-bincount-work-with-2d-arrays
def bincount2d(arr, bins=None):
    if bins is None:
        bins = np.max(arr) + 1
    count = np.zeros(shape=[len(arr), bins], dtype=np.int64)
    indexing = (np.ones_like(arr).T * np.arange(len(arr))).T
    np.add.at(count, (indexing, arr), 1)
    return count
######################################################################
# Load precomputed PCA mean and eigenvectors from an .npz file 
# (or create the file if it doesn't exist)
def load_precomputed_pca(train_images, k):
    try:
        d = np.load('/work/data/mnist_pca.npz')
        mean = d['mean']
        eigenvectors = d['eigenvectors']
        print('loaded precomputed PCA from mnist_pca.npz')
    except:
        print('precomputing PCA one time only for train_images...')
        start_time = time.perf_counter()
        
        ndim = train_images.shape[1]
        mean, eigenvectors = cv2.PCACompute(train_images, 
                                            mean=None, 
                                            maxComponents=train_images.shape[1])
        print('done ({} seconds)\n'.format(time.perf_counter()-start_time))
        np.savez_compressed('/work/data/mnist_pca.npz',
                            mean=mean,
                            eigenvectors=eigenvectors)
    eigenvectors = eigenvectors[:k]
    return mean, eigenvectors
######################################################################
# Construct an object we can use for fast nearest neighbor queries.
# See https://github.com/mariusmuja/flann
# And https://docs.opencv.org/master/dc/de2/classcv_1_1FlannBasedMatcher.html
def get_knn_matcher():
    FLANN_INDEX_KDTREE = 0
    index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
    search_params = dict(checks=50)   # or pass empty dictionary
    
    matcher = cv2.FlannBasedMatcher(index_params, search_params)
    return matcher
######################################################################
# Use the matcher object to match query vectors to training vectors.
#
# Parameters:
#
#   * query_vecs is p-by-n
#   * train_vecs is m-by-n
#   * train_labels is flat array of length m (optional)
#
# Returns: 
#
#   * match_indices p-by-k indices of closest rows in train_vecs
#   * labels_pred flat array of length p (if train_labels is provided)
def match_knn(matcher, k, query_vecs, train_vecs, train_labels=None):
    knn_result = matcher.knnMatch(query_vecs, train_vecs, k)
    match_indices = np.full((len(query_vecs), k), -1, int)
    for i, item_matches in enumerate(knn_result):
        match_indices[i] = [ match.trainIdx for match in item_matches ]
    if train_labels is None:
        return match_indices
    match_labels = train_labels[match_indices]
    bcount = bincount2d(match_labels, bins=10)
    labels_pred = bcount.argmax(axis=1)
    return match_indices, labels_pred
######################################################################
# Draw outlined text in an image
def outline_text(img, text, pos, scl, fgcolor=None, bgcolor=None):
    if fgcolor is None:
        fgcolor = (255, 255, 255)
    if bgcolor is None:
        bgcolor = (0, 0, 0)
    for (c, w) in [(bgcolor, 3), (fgcolor, 1)]:
        cv2.putText(img, text, pos,
                    cv2.FONT_HERSHEY_SIMPLEX,
                    scl, c, w, cv2.LINE_AA)
######################################################################
# Convert a row vector to an image.
def row2img(row, sz, text, img_type='image', bgcolor=None):
    scl = sz // MNIST_IMAGE_SIZE
    if img_type == 'eigenvector':
        rmax = np.abs(row).max()
        row = 0.5 + 0.5*row/rmax
    elif img_type == 'error':
        row = np.clip(0.5 + 0.5*row/255, 0, 1)
    else:
        row = np.clip(row/255, 0, 1)
    row = 1 - row
    img = (row * 255).astype(np.uint8).reshape(MNIST_IMAGE_SIZE, MNIST_IMAGE_SIZE)
    img = cv2.resize(img, (0, 0), fx=scl, fy=scl, interpolation=cv2.INTER_NEAREST)
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
    if text is not None:
        outline_text(img, text, (4, 16), 0.5, bgcolor=bgcolor)
    return img
######################################################################
# Do PCA demo
def demo_mean_eigenvectors(mean, eigenvectors, n_show=25):
    images = [ row2img(mean.flatten(),
                       TARGET_DISPLAY_SIZE, 'Mean digit image') ]
    for i, evec in enumerate(eigenvectors):
        label = f'Eigenvector {i+1}/{len(eigenvectors)}'
        images.append(row2img(evec, TARGET_DISPLAY_SIZE, label, 
                              img_type='eigenvector'))
                              
    sq = int(np.floor(np.sqrt(n_show)))
    rows, cols = int(np.ceil(n_show / sq))+1, sq
    fig, ax = plt.subplots(rows, cols, figsize=(5*cols, 5*rows))
    ax[0,0].imshow(images[0])
    idx = 1
    for i in range(rows):
        for j in range(cols):
            ax[i+1,j].imshow(images[idx])
            idx += 1
######################################################################
# Do reconstruction demo
def demo_reconstruction(mean, eigenvectors, test_images, test_vecs, n_show=4):
    test_recons = cv2.PCABackProject(test_vecs, mean, eigenvectors)
    idx = 0
    n = len(test_images)
    
    rows, cols = n_show // 2, 2
    fig, ax = plt.subplots(rows, cols, figsize=(15*cols, 5*rows))
    for i in range(rows):
        for j in range(cols):
            img_orig = test_images[idx]
            img_recons = test_recons[idx]
            orig = row2img(img_orig, TARGET_DISPLAY_SIZE, 
                        f'Test image {idx:04d}')
            recons = row2img(img_recons, TARGET_DISPLAY_SIZE,
                            f'Reconstructed from {len(eigenvectors)} PCs')
            err = row2img(img_orig - img_recons, TARGET_DISPLAY_SIZE,
                        'Error image', img_type='error')
            display = np.hstack((orig, recons, err))
            ax[i,j].imshow(display)
            idx += 1
######################################################################
train_labels, train_images, test_labels, test_images = get_mnist_data()
train_images = train_images.astype(np.float32)
train_images = train_images.reshape(-1, MNIST_DIMS) # make row vectors 
if pca_k > 0:
    # note we could use cv2.PCACompute to do this but
    # instead we use a pre-computed eigen-decomposition
    # of the data if available
    mean, eigenvectors = load_precomputed_pca(train_images, pca_k)
    eigenvectors = eigenvectors[:pca_k]
    print('reducing dimensionality of training set...')
    start_time = time.perf_counter()
    train_vecs = cv2.PCAProject(train_images, mean, eigenvectors)
    print('done ({} seconds)\n'.format(time.perf_counter()-start_time))
else:
    mean = None
    eigenvectors = None
    train_vecs = train_images
print('train_images:', train_images.shape)
print('train_vecs:', train_vecs.shape)
print()
test_images = test_images.astype(np.float32)
test_images = test_images.reshape(-1, MNIST_DIMS)
if pca_k > 0:
    print('reducing dimensionality of test set...')
    test_vecs = cv2.PCAProject(test_images, mean, eigenvectors)
    print('done\n')
else:
    test_vecs = test_images
print('test_images:', test_images.shape)
print('test_vecs:', test_vecs.shape)
print()
matcher = get_knn_matcher()
num_test = len(test_images)
total_errors = 0
start = time.perf_counter()
print(f'evaluating knn accuracy with k={knn_k}...')
for start_idx in range(0, num_test, BATCH_SIZE):
    end_idx = min(start_idx + BATCH_SIZE, num_test)
    cur_batch_size = end_idx - start_idx
    idx, labels_pred = match_knn(matcher, knn_k,
                                    test_vecs[start_idx:end_idx],
                                    train_vecs, train_labels)
    labels_true = test_labels[start_idx:end_idx]
    total_errors += (labels_true != labels_pred).sum()
    error_rate = 100.0 * total_errors / end_idx
    print(f'{total_errors:4d} errors after {end_idx:5d} test examples (error rate={error_rate:.2f}%)')
elapsed = (time.perf_counter() - start)
print(f'total time={elapsed:.2f} seconds ({elapsed/end_idx:.4f}s per image)')
print("----------- FINAL ERROR RATE -------------")
print(f'{total_errors:4d} errors after {end_idx:5d} test examples (error rate={error_rate:.2f}%)')
print("------------------------------------------")
demo_reconstruction(mean, eigenvectors, test_images, test_vecs)
demo_mean_eigenvectors(mean, eigenvectors, n_show=25)