Dimensionality reduction: Autoencoder + OPTUNA
!pip install optuna
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, LeakyReLU
from keras.backend import clear_session
import tensorflow as tf
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE, Isomap
import optuna
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen
from itertools import combinations
url_ob = urlopen("https://archive.ics.uci.edu/ml/machine-learning-databases/00282/LSVT_voice_rehabilitation.zip")
# Keep dataset as bites in an in-memory buffer with BytesIO() function
zipfile = ZipFile(BytesIO(url_ob.read()))
print(f'Files in the archive: {zipfile.namelist()}')
LSVT_voice_X = pd.read_excel(zipfile.open('LSVT_voice_rehabilitation.xlsx'))
LSVT_voice_labels = pd.read_excel(zipfile.open('LSVT_voice_rehabilitation.xlsx'), 'Binary response')
# Brief overview of the dataset
LSVT_voice_X.iloc[:5, :10]
print(f'Label values: {set(LSVT_voice_labels)}')
print(f'Shape of features set: {LSVT_voice_X.shape}')
LSVT_voice_labels.hist()
plt.show()
LSVT_voice_X.describe().loc['min'][:5]
LSVT_voice_X.describe().loc['max'][::-1][:5]
scaler = MinMaxScaler(feature_range=(0,1))
LSVT_scaled = scaler.fit_transform(LSVT_voice_X)
# We also need to convert LSVT_voice_labels to numpy array for future convenience
LSVT_voice_labels = LSVT_voice_labels.to_numpy().flatten()
model = RandomForestClassifier(random_state=42)
cv = StratifiedKFold(5, shuffle=True, random_state=42)
cross_val_score(model, LSVT_scaled, LSVT_voice_labels, scoring='f1_macro', cv=cv).mean()
def plot_representation(features, labels, rep_type):
fig, axes = plt.subplots(2, 3, figsize=(12,6))
fig.suptitle('Scatter plot: ' + rep_type + ' 4 features representation', fontsize=16)
for i, pair in enumerate(combinations([0,1,2,3], 2)):
for label, color in zip([1,2], ['b', 'g']):
axes[int(i / 3), int(i % 3)].scatter(features[labels == label, pair[0]],
features[labels == label, pair[1]],
c=color,
alpha=0.5,
label = label)
axes[int(i / 3), int(i % 3)].set_xlabel(f'Feature {pair[0]}')
axes[int(i / 3), int(i % 3)].set_ylabel(f'Feature {pair[1]}')
axes[int(i / 3), int(i % 3)].legend()
plt.tight_layout()
PCA_transformer = PCA(n_components=4)
PCA_representation = PCA_transformer.fit_transform(LSVT_scaled)
#Visualize results
plot_representation(PCA_representation, LSVT_voice_labels, 'PCA')
print(f'PCA features representation has the shape of: {PCA_representation.shape}')
# Evaluate PCA features representation on Random Forest Classirier
model = RandomForestClassifier(random_state=42)
cv = StratifiedKFold(5, shuffle=True, random_state=42)
scores = cross_val_score(model, PCA_representation, LSVT_voice_labels, scoring='f1_macro', cv=cv)
print(f'Mean cross validation score on PCA features representation {np.mean(scores)}')
# methhod='exact' is used istead of 'barners_hut'
# because it is constrained to choose > 4 n_components
tSNE_transformer = TSNE(n_components=4, init='pca', learning_rate = 'auto', random_state=42, method='exact')
tsne_representation = tSNE_transformer.fit_transform(LSVT_scaled)
#Visualize results
plot_representation(tsne_representation, LSVT_voice_labels, 'tSNE')
print(f'tSNE features representation has the shape of: {tsne_representation.shape}')
# Evaluate tSNE features representation on Random Forest Classifier
model = RandomForestClassifier(random_state=42)
cv = StratifiedKFold(5, shuffle=True, random_state=42)
scores = cross_val_score(model, tsne_representation, LSVT_voice_labels, scoring='f1_macro', cv=cv)
print(f'Mean cross validation score on tSNE features representation {np.mean(scores)}')
input_dim = 310
layers_num = [2,3,4]
for i, layers in enumerate(layers_num):
print(f'{i+1}) ANN structure with \'layers_num={layers}\' parameter:')
decoder = [int((input_dim/(4))*i**3) for i in range(layers, 0, -1)]
encoder = [int((input_dim/(4))*i**3) for i in range(1, layers+1)]
print(f'Decoder: {decoder} neurons + bottleneck: 4 neurons + Encoder: {encoder} neurons')
def start_Autoencoder(features,
trials,
plot_graph = False):
# Convert non-pandas array to numpy
if isinstance(features, pd.core.frame.DataFrame) == True:
features = features.to_numpy()
input_dim = features.shape[1]
# Autoencoder structure
def create_model(activation, layers_num, dropout_prob, kernel_initializer):
clear_session()
autoencoder = Sequential()
# Encoder
for i in range(layers_num, 0, -1):
autoencoder.add(Dense(int((input_dim/(2**2))*i**3),
input_shape=(input_dim,),
activation=activation,
kernel_initializer=kernel_initializer,
name='encoder' + str(layers_num-i) )
)
if (i == layers_num):
autoencoder.add(Dropout(dropout_prob))
# Bottleneck
autoencoder.add(Dense(4,
activation=activation,
kernel_initializer=kernel_initializer,
name='bottleneck' + str(layers_num) )
)
# Decoder
for i in range(1, layers_num+1):
autoencoder.add(Dense(int((input_dim/(2**2))*i**3),
activation=activation,
kernel_initializer=kernel_initializer,
name='decoder' + str(i+layers_num) )
)
if (i == layers_num-1):
autoencoder.add(Dropout(dropout_prob))
# Output layer
autoencoder.add(Dense(input_dim,
activation=activation,
kernel_initializer=kernel_initializer,
name='output'))
return autoencoder
# Objective function to optimize by OPTUNA
def objective(trial):
activation = trial.suggest_categorical("activation", ["relu", "sigmoid", "swish"])
layers_num = trial.suggest_int("layers_num", 2,5,1)
dropout_rate = trial.suggest_float("dropout_prob", 0.0, 0.9, step=0.1)
if (activation == "relu"):
model = create_model(activation, layers_num, dropout_rate, kernel_initializer="HeUniform")
else:
model = create_model(activation, layers_num, dropout_rate, kernel_initializer="GlorotUniform")
model.compile(optimizer='adam', loss='mse')
# Implement early stopping criterion.
# Training process stops when there is no improvement during 50 iterations
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=50)
history = model.fit(features,
features,
batch_size = features.shape[0],
epochs=500,
callbacks = [callback],
verbose = 0)
return history.history["loss"][-1]
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=trials)
# Create final model with the best hyperparams
print('Best hyperparams found by Optuna: \n', study.best_params)
if (study.best_params['activation'] == "relu"):
model = create_model(study.best_params['activation'],
int(study.best_params['layers_num']),
study.best_params['dropout_prob'],
kernel_initializer="HeUniform")
else:
model = create_model(study.best_params['activation'],
int(study.best_params['layers_num']),
study.best_params['dropout_prob'],
kernel_initializer="GlorotUniform")
model.compile(optimizer='adam', loss='mse')
model.summary()
# Implement early stopping criterion.
# Training process stops when there is no improvement during 50 iterations
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=50)
history = model.fit(features,
features,
batch_size = features.shape[0],
epochs=500,
callbacks = [callback],
verbose = 0)
result = model.predict(features)
plot_model(model,
to_file='model_plot.png',
show_shapes=True,
show_layer_names=True,
rankdir="TB",
dpi=150)
if (plot_graph == True):
pd.DataFrame(history.history).plot(figsize=(8,5))
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('MSE')
plt.title('Loss curve')
plt.savefig('loss_curve.png', dpi=150)
plt.show()
# Result evaluation
print(f'RMSE Autoencoder: {np.sqrt(mean_squared_error(features, result))}')
print('')
feature_extractor = keras.Model(
inputs=model.inputs,
outputs=model.get_layer(name='bottleneck' + str(study.best_params['layers_num'])).output)
# Following values are returned: extracted_f || MSE || OPTUNA best hyperparams
return np.array(feature_extractor(features)), mean_squared_error(features, result), study.best_params
Acoder_representation, Acoder_MSE, Acoder_hyperparams = start_Autoencoder(features = LSVT_scaled,
trials = 50,
plot_graph=True)
#Visualize results
plot_representation(Acoder_representation, LSVT_voice_labels, 'Autoencoder')
# Acoder_representation = pd.read_csv('Acoder_representation.csv', sep=' ', header=None).to_numpy()
print(f'Autoencoder features representation has the shape of: {Acoder_representation.shape}')
model = RandomForestClassifier(random_state=42)
cv = StratifiedKFold(5, shuffle=True, random_state=42)
scores = cross_val_score(model, Acoder_representation, LSVT_voice_labels, scoring='f1_macro', cv=cv)
print(f'Mean cross validation score on Autoencoder features representation {np.mean(scores)}')