import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy as sc
from scipy.stats import norm
from scipy.optimize import minimize
import random
from matplotlib import cm
from numpy import linalg
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.utils import shuffle
from tqdm import tqdm
#%% Import dataset #####################################
fields = ['age', 'sex', 'chest pain type', 'resting blood pressure', 'serum cholestoral' , 'if fasting blood sugar', 'resting electrocardiographic results', 'maximum heart rate achieved', 'exercise induced angina', 'ST depression', 'slope of the peak exercise ST segment', 'major vessels', 'thalassemia', 'heart disease']
heart_array = np.genfromtxt('heart.csv', delimiter=',')
# Normalize the data, but not the last cell (values for heart disease yes or no).
X_array = heart_array[:,:-1]
y_array = heart_array[:,-1:]
normalized_X_array = (X_array - X_array.mean(axis=0)) / X_array.std(axis=0)
normalized_heart_array = np.hstack((normalized_X_array,y_array))
heart = pd.DataFrame(data = normalized_heart_array, columns= fields)
# Take one nineth of the dataset out as test set.
shuffled_heart = shuffle(heart)
split_heart = np.split(shuffled_heart, 9)
heart = pd.concat([split_heart[i] for i in range(8)])
heart_test = split_heart[8]
# Global variables, which are the same
cluster_number = 5 # #-fold crossvalidation
C_min, C_max= 0, 8
gamma_min, gamma_max = -9, 0
C_steps = 10
gamma_steps = 11
method = "self-made"
def gaussian(x, y, l = 1):
l2_norm = np.sum(x**2,1).reshape(-1,1) + np.sum(y**2,1) - 2*np.dot(x, y.T)
return np.exp(-.5 * (1/l) * l2_norm)
def prediction_new(par_comb, X, Y, mthd="gpr"):
par_comb = np.array([par_comb])
X = np.array(X)
Y = np.array(Y)
Sigma = gaussian(X, X)
L = np.linalg.cholesky(Sigma)
k = gaussian(X, par_comb)
L_k = np.linalg.solve(L, k)
mu = np.dot(L_k.T, np.linalg.solve(L, Y)).reshape((len(par_comb),))
K_par = gaussian(par_comb, par_comb)
sigma = np.sqrt(np.diag(K_par) - np.sum(L_k**2, axis=0))
# else:
# kernel = 1.0 * RBF(1.0)
# gpr = GaussianProcessRegressor(kernel=kernel, random_state=0)
# gpr = gpr.fit(X, Y)
# mu, sigma = gpr.predict([par_comb], return_std=True) # mu(x) and sigma(x)
# sigma = sigma.reshape(-1,1)
return mu, sigma
def gaussian_kernel(x, y, l = 1):
"""Returns the RPF kernel with sigma 1 and length 1."""
x, y = np.array(x), np.array(y)
l2_norm = np.linalg.norm(x-y)
return np.exp(-(l2_norm**2)/(2*(l**2)))
def prediction(par_comb, X_i, Y_i, mthd="gpr"):
"""Predicts the mean and the standard deviation on par_comb (consisting of C and gamma) using X_i
and Y_i."""
m = len(X_i)
Sigma = np.zeros([m,m])
k = np.zeros(m)
k_hh = 1
if (mthd == "self-made"):
if par_comb in X_i: #otherwise it gives errors
sigma = np.array([0])
index = np.where((np.array(X_i) == par_comb).all(axis=1))[0][0]
Y_i = np.array(Y_i)
mu = Y_i[index]
# print(f"C,g are in the training set, sigma = 0 and mu = y(C,g).")
else:
for i in range(m):
k[i] = gaussian_kernel(X_i[i], par_comb)
for j in range(m):
Sigma[i][j] = gaussian_kernel(X_i[i], X_i[j])
k_trans = np.transpose(k)
Sigma_inv = np.linalg.pinv(Sigma)
mu = np.matmul(k_trans, np.matmul(Sigma_inv, Y_i))
sigma = np.sqrt(max(k_hh - np.matmul(k_trans, np.matmul(Sigma_inv, k)),0))
else:
kernel = 1.0 * RBF(1.0)
gpr = GaussianProcessRegressor(kernel=kernel, random_state=0)
gpr = gpr.fit(X_i, Y_i)
mu, sigma = gpr.predict([par_comb], return_std=True) # mu(x) and sigma(x)
sigma = sigma.reshape(-1,1)
return mu, sigma
def validation_error(X_train, X_val, y_train, y_val, c_log = 0, g_log = -10):
c = 10**c_log
g = 10**g_log
svclassifier = SVC(kernel='rbf', C = c, gamma = g)
svclassifier.fit(X_train, y_train)
y_predict = svclassifier.predict(X_val)
error = sum(y_predict!=y_val)
return error
def crossvalidation_error(dataset, c_log = 0, g_log = -10):
shuffled_dataset = shuffle(dataset)
split_dataset = np.split(shuffled_dataset, cluster_number)
error_list = []
error = 0
for i in range(cluster_number):
# Compute validation data
X_validation = split_dataset[i].drop('heart disease', axis=1)
y_validation = split_dataset[i]['heart disease']
# Compute train data
train_indices = [*range(0,cluster_number)]
train_indices.remove(i)
train_data = pd.concat([split_dataset[j] for j in train_indices]) #Data without split_dataset[i]
X_train = train_data.drop('heart disease', axis=1)
y_train = train_data['heart disease']
local_error = validation_error(X_train, X_validation, y_train, y_validation, c_log, g_log)/(len(heart)/cluster_number)
error_list.append(local_error)
error += local_error/cluster_number
return error #error if mean, max(error_list) if max
We choose k different points in the grid [0,9] x [-10,0] where we will evaluate the cross-validation (in)accuracy on.
from scipy.interpolate import NearestNDInterpolator
def plot_heatmap_points(X_list, error_list, grid_stepsize):
C_axis = np.arange(C_min , C_max + grid_stepsize[0] , grid_stepsize[0])
gamma_axis = np.arange(gamma_min, gamma_max+ grid_stepsize[1], grid_stepsize[1])
C_axis, gamma_axis = np.meshgrid(C_axis, gamma_axis)
interp = NearestNDInterpolator(X_list, error_list)
full_error_array = interp(C_axis, gamma_axis)
plt.rcParams["figure.figsize"] = (7,7)
plt.pcolormesh(C_axis, gamma_axis, full_error_array, shading='auto')
plt.scatter([X[0] for X in X_list], [X[1] for X in X_list], label="evaluated point")
index_min = np.argmin(error_list)
plt.plot(X_list[index_min][0], X_list[index_min][1], "ro", label="found minimum")
plt.legend()
plt.colorbar()
plt.ylabel("log10(gamma)")
plt.xlabel("log10(C)")
plt.show()
def grid_search(C_steps, gamma_steps, zoom, num_zooms, plot = False):
#Set up inital frame of size C_steps, gamma_steps = 3, 3
stepsize = ((C_max - C_min) / C_steps, (gamma_max - gamma_min) / gamma_steps)
frame_center = (C_min + (C_max - C_min) / 2, gamma_min + (gamma_max - gamma_min) / 2)
axis = [None, None]
error_array = np.zeros((C_steps, gamma_steps), dtype=float)
eps = 10**-9
X_list, error_list = [], []
for k in range(num_zooms):
#Create the axis of the plot
frame_stepsize = (stepsize[0] * zoom**(-k), stepsize[1] * zoom**(-k))
frame_width = frame_stepsize[0] * C_steps
frame_height = frame_stepsize[1] * gamma_steps
axis[0] = np.arange(frame_center[0] - frame_width / 2, frame_center[0] + frame_width / 2, frame_stepsize[0] + eps, dtype=float) + frame_stepsize[0] / 2
axis[1] = np.arange(frame_center[1] - frame_height / 2, frame_center[1] + frame_height / 2, frame_stepsize[1] + eps, dtype=float) + frame_stepsize[1] / 2
#If the point is inside the new smaller frame calculate the local value
for C in axis[0]:
for gamma in axis[1]:
if(C > C_min and C < C_max and gamma > gamma_min and gamma < gamma_max):
error_ratio = crossvalidation_error(heart, C, gamma)
X_list.append((C, gamma))
error_list.append(error_ratio)
#Print the erro and the C and gamma values
error_element_index = np.argmin(error_list)
C_minimum = X_list[error_element_index][0]
gamma_minimum = X_list[error_element_index][1]
error_ratio = error_list[error_element_index]
print(f"({k+1}/{num_zooms}) Minima found for C = {10**C_minimum:.3E} and gamma = {10**gamma_minimum:.3E} and with error ratio: {error_ratio:.3f}")
frame_center = (C_minimum, gamma_minimum)
if plot:
plt.title("Error ratio after cross validation of dataset for different C and gamma")
plot_heatmap_points(X_list, error_list, (stepsize[0] * zoom**(-num_zooms) + eps, stepsize[1] * zoom**(-num_zooms) + eps))
return C_minimum, gamma_minimum, error_ratio
C_steps_frame, gamma_steps_frame = 4, 4 #size of each grid
zoom = 1.7 #how much should the grid shrink
num_zooms = 3 #how many times do you want to zoom in
C_minimum, gamma_minimum, error_ratio_grid_search = grid_search(C_steps_frame, gamma_steps_frame, zoom, num_zooms, plot = True)
class GaussianPrediction:
def __init__(self, cov = 1.0, xi = 0.01, alpha = 0.2):
self.Xi = []
self.Yi = []
self.cov = cov
self.xi = xi
self.alpha = alpha
self.FE_h = 0.2
def gaussian_kernel(self, x, y):
"""Returns the RBF kernel with sigma 1 and length 1."""
x, y = np.array(x), np.array(y)
l2_norm = np.linalg.norm(x-y)
return np.exp(-(l2_norm**2)/(2*(self.cov**2)))
def sample_point(self,x):
"""Sample the cross validation error at a given point and calculate the Sigma matrix and it's inverse
this is only done once for every added point for efficiency"""
self.Xi.append(x)
self.Yi.append(crossvalidation_error(heart, x[0], x[1]))
# if x in self.Xi:
# print(f"{x[0]}, {x[1]} combination of C and gamma is already in the trainingset")
# return
self.Sigma = np.zeros([len(self.Xi),len(self.Xi)])
for i in range(len(self.Xi)):
for j in range(len(self.Xi)):
self.Sigma[i][j] = self.gaussian_kernel(self.Xi[i], self.Xi[j])
self.Sigma_inv = np.linalg.pinv(self.Sigma)
self.y = np.array(self.Yi)
self.mu_sample_opt = np.min(self.y)
return
def evaluate_mu_sigma(self, x):
""" Evaluate mu sigma at a given point x using Sigma calculated before"""
k = np.zeros(len(self.Xi))
for i in range(len(self.Xi)):
k[i] = self.gaussian_kernel(self.Xi[i], x)
k_trans = np.transpose(k)
K_hh = 1
mu = np.matmul(k_trans, np.matmul(self.Sigma_inv, self.y))
sigma = np.sqrt(max(K_hh - np.matmul(k_trans, np.matmul(self.Sigma_inv, k)), 0))
return mu, sigma
def expected_improvement(self, x):
""" Calculate the expected improvement for a given point"""
mu, sigma = self.evaluate_mu_sigma(x)
#take the value for which the mean is the smallest (best)
if sigma != 0:
imp = self.mu_sample_opt - self.xi - mu
Z = imp / sigma
ei = - imp * norm.cdf(Z) - sigma*norm.pdf(Z)
else:
ei = 0.0
return ei
def gradient_sigma_mu(self, x):
"""Return the gradient of sigma and mu approximated using finite differences"""
h = self.FE_h
C = x[0]
gamma = x[1]
mu_C, sigma_C = self.evaluate_mu_sigma([C - 0.5*h, gamma ])
mu_gamma, sigma_gamma = self.evaluate_mu_sigma([C , gamma - 0.5*h])
mu_C_h, sigma_C_h = self.evaluate_mu_sigma([C + 0.5*h, gamma ])
mu_gamma_h, sigma_gamma_h = self.evaluate_mu_sigma([C , gamma + 0.5*h])
gradient_sigma = [(sigma_C_h - sigma_C) / h, (sigma_gamma_h - sigma_gamma) / h] #[dsigma/dC, dsigma/dgamma]
gradient_mu = [(mu_C_h - mu_C) / h , (mu_gamma_h - mu_gamma) / h ] #[dmu/dC, dmu/dgamma]
return gradient_sigma, gradient_mu
def gradient_expected_improvement(self, x):
"""Return the gradients of the expected improvement function"""
C, gamma = x[0], x[1]
mu, sigma = self.evaluate_mu_sigma(x)
gradient_sigma, gradient_mu = self.gradient_sigma_mu(x)
if sigma != 0:
imp = (self.mu_sample_opt - self.xi - mu)
x = imp/sigma
gradient_dC = gradient_mu[0]*norm.cdf(x) - gradient_sigma[0] * norm.pdf(x)
gradient_dgamma = gradient_mu[1]*norm.cdf(x) - gradient_sigma[1] * norm.pdf(x)
else:
gradient_dC = 0
gradient_dgamma = 0
return np.concatenate((gradient_dC,gradient_dgamma), axis = None)
def lower_confidence_bound(self, x):
"""Use lower confidence bound as new acquisition function,
alpha is a parameter controlling the degree of exploration"""
mu, sigma = self.evaluate_mu_sigma(x) # mu(x) and sigma(x)
lcb = mu - self.alpha * sigma
return lcb
def gradient_lower_confidence_bound(self, x):
"""Return the gradients of the lower confidence bound"""
C, gamma = x[0], x[1]
mu, sigma = self.evaluate_mu_sigma(x) # mu(x) and sigma(x)
gradient_sigma, gradient_mu = self.gradient_sigma_mu(x)
gradient_dC = gradient_mu[0] - self.alpha*gradient_sigma[0]
gradient_dgamma = gradient_mu[1] - self.alpha*gradient_mu[1]
return np.concatenate((gradient_dC,gradient_dgamma), axis = None)
def restrict_to_domain(point, domain):
"""Return the point to domain rectangle if it left is"""
point = list(point)
if point[0] < domain[0][0]:
point[0] = domain[0][0]
if point[0] > domain[0][1]:
point[0] = domain[0][1]
if point[1] < domain[1][0]:
point[1] = domain[1][0]
if point[1] > domain[1][1]:
point[1] = domain[1][1]
return list(point)
def plot_surface(GaussianPrediction, ax, fig, aq_func, domain, steps):
"""Plot a surface of an arbitrary function in a given domain"""
# Set up axis
X = np.linspace(domain[0][0], domain[0][1], steps[0])
Y = np.linspace(domain[1][0], domain[1][1], steps[1])
# Fill grid with function value
Z = np.zeros(steps)
for j , y in enumerate(Y):
for i , x in enumerate(X):
Z[j,i] = aq_func([x, y])
X, Y = np.meshgrid(X, Y)
surf = ax.plot_surface(X, Y, Z, cmap=cm.coolwarm, linewidth=0, antialiased=False, alpha = 0.5)
fig.colorbar(surf, shrink=0.5, aspect=5)
def gradient_descent(GaussianPrediction, aq_func, aq_grad_func, trials, max_trial_length, h, min_step, domain, steps, plot = False):
"""For an arbitrary function start at 'trails' amount of random points in the domain and follow gradient descent to
the lowest value until there is no improvement or the max amount of iterations is reached.
If 'plot = True' the function also plots the surface of the function and the paths of the minimazation algorithm.
At the end the z-axis is flipped to show the expected improvement"""
# Set up the list for the optimalisation paths
list_paths_xy = [0] * trials
list_paths_z = [0] * trials
# Choose random starting points
initial_x = np.random.uniform(domain[0][0], domain[0][1], trials)
initial_y = np.random.uniform(domain[1][0], domain[1][1], trials)
for i in range(trials):
starting_point = [initial_x[i], initial_y[i]]
list_paths_xy[i] = [starting_point]
list_paths_z[i] = [aq_func(starting_point)]
current_point = list_paths_xy[i][0]
for _ in range(max_trial_length):
gradient = aq_grad_func(current_point)
next_point = [current_point[0] - h * gradient[0], current_point[1] - h * gradient[1]]
next_point = restrict_to_domain(next_point, domain)
if (current_point[0] - next_point[0])**2 + (current_point[1] - next_point[1])**2 < min_step**2:
break
current_point = next_point
if plot:
list_paths_xy[i].append(next_point)
list_paths_z[i].append(aq_func(next_point))
if not plot:
list_paths_xy[i].append(next_point)
list_paths_z[i].append(aq_func(next_point))
if plot:
#Set up fig and plot surface of expected improvement
fig, ax = plt.subplots(subplot_kw={"projection": "3d"})
plot_surface(GaussianPrediction, ax, fig, aq_func, domain, steps)
# Plot starting point
ax.scatter([path[0][0] for path in list_paths_xy], [path[0][1] for path in list_paths_xy], [path[0] for path in list_paths_z], marker = 'o', label = 'Starting point', c = 'r')
# Plot optimalisation path for every trial, but only give one a label to remove clutter
for i in range(trials):
line, = ax.plot([point[0] for point in list_paths_xy[i]], [point[1] for point in list_paths_xy[i]], [float(z) for z in list_paths_z[i]], '-ko', markersize = 2)
line.set_label('Optimalisation path')
# Plot starting ending point
ax.scatter([path[-1][0] for path in list_paths_xy], [path[-1][1] for path in list_paths_xy], [path[-1] for path in list_paths_z], marker = 'o', label = 'Ending point', c = 'b')
best_trial = np.argmin(np.array([path_z[-1] for path_z in list_paths_z]))
point_min = list_paths_xy[best_trial][-1]
z_min = float(list_paths_z[best_trial][-1])
return point_min, z_min
#initialise the Gaussian predict surface variables
GP_ei = GaussianPrediction(cov= 2.0,
xi = 0.001)
#initialse variables for the gradient descent
trials = 20
max_trial_length = 200
h = 0.5
min_step = 0.02
epsilon = 10**(-3)
domain = [(C_min + epsilon, C_max), (gamma_min + epsilon, gamma_max)]
steps = (C_steps, gamma_steps)
#Choose a random point in the domain and evaluate the error
X_init = [random.uniform(C_min, C_max), random.uniform(gamma_min, gamma_max)]
GP_ei.sample_point(X_init)
best_so_far_error = GP_ei.Yi.copy()
n = 50
for i in tqdm(range(n)):
point_min, ei_min = gradient_descent(GP_ei, GP_ei.expected_improvement, GP_ei.gradient_expected_improvement, \
trials, max_trial_length, h, min_step, domain, steps, plot = False)
GP_ei.sample_point(point_min)
best_so_far_error.append(min(best_so_far_error[-1], GP_ei.Yi[-1]))
plt.plot(best_so_far_error)
plt.axhline(y = error_ratio_grid_search, color = 'r', linestyle = '-')
plt.show()
plt.title("Error ratio after cross validation of dataset for different C and gamma")
plot_heatmap_points(GP_ei.Xi, GP_ei.Yi, (0.1,0.1))
plt.show()
#Choose n random points in the domain and evaluate the error
n = 15
#initialise the Gaussian predict surface variables
GP_ei_plot = GaussianPrediction(cov= 2.0,
xi = 0.001)
X_init = np.array([[random.uniform(C_min, C_max), random.uniform(gamma_min, gamma_max)] for i in range(n)])
for x in X_init:
GP_ei_plot.sample_point(x)
trials = 30
max_trial_length = 200
h = 0.5
min_step = 0.01
domain = [(C_min, C_max), (gamma_min, gamma_max)]
steps = (50, 50)
#Do the gradient descent en plot the paths
gradient_descent(GP_ei_plot, GP_ei_plot.expected_improvement, GP_ei_plot.gradient_expected_improvement, trials, max_trial_length, h, min_step, domain, steps, plot = True)
plt.legend()
plt.show()
Opdracht 4: nieuwe acquisition function
#initialise the Gaussian predict surface variables
GP_lcb = GaussianPrediction(cov = 2.0,
alpha= 0.001)
#initialse variables for the gradient descent
trials = 20
max_trial_length = 200
h = 0.5
min_step = 0.02
epsilon = 10**(-3)
domain = [(C_min + epsilon, C_max), (gamma_min + epsilon, gamma_max)]
steps = (C_steps, gamma_steps)
#Choose a random point in the domain and evaluate the error
X_init = [random.uniform(C_min, C_max), random.uniform(gamma_min, gamma_max)]
GP_lcb.sample_point(X_init)
best_so_far_error = GP_lcb.Yi.copy()
n = 50
for i in tqdm(range(n)):
point_min, lcb_min = gradient_descent(GP_lcb, GP_lcb.lower_confidence_bound, GP_lcb.gradient_lower_confidence_bound, \
trials, max_trial_length, h, min_step, domain, steps, plot = False)
GP_lcb.sample_point(point_min)
best_so_far_error.append(min(best_so_far_error[-1], GP_lcb.Yi[-1]))
plt.plot(best_so_far_error)
plt.axhline(y = error_ratio_grid_search, color = 'r', linestyle = '-')
plt.show()
plt.title("Error ratio after cross validation of dataset for different C and gamma")
plot_heatmap_points(GP_lcb.Xi, GP_lcb.Yi, (0.1,0.1))
plt.show()
#Choose n random points in the domain and evaluate the error
n = 15
#initialise the Gaussian predict surface variables
GP_lcb_plot = GaussianPrediction(cov= 2.0,
alpha = 0.001)
X_init = np.array([[random.uniform(C_min, C_max), random.uniform(gamma_min, gamma_max)] for i in range(n)])
for x in X_init:
GP_lcb_plot.sample_point(x)
trials = 30
max_trial_length = 200
h = 0.5
min_step = 0.01
domain = [(C_min, C_max), (gamma_min, gamma_max)]
steps = (50, 50)
#Do the gradient descent en plot the paths
gradient_descent(GP_lcb_plot, GP_lcb_plot.lower_confidence_bound, GP_lcb_plot.gradient_lower_confidence_bound, trials, max_trial_length, h, min_step, domain, steps, plot = True)
plt.legend()
plt.show()
X_training = heart.drop('heart disease', axis=1)
y_training = heart['heart disease']
X_test = heart_test.drop('heart disease', axis=1)
y_test = heart_test['heart disease']
local_error = validation_error(X_training, X_test, y_training, y_test, c_log, g_log)