%cd /content/drive/My Drive/papila_img/
!pip install mlflow
# basic packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import os
import random
# img packages
from PIL import Image
from skimage.transform import resize
# ml packages
from sklearn.metrics import accuracy_score, precision_score, recall_score,classification_report,confusion_matrix,precision_recall_curve,roc_curve
from sklearn.model_selection import train_test_split
# deep learning packages
import tensorflow as tf
from keras.metrics import Precision
from tensorflow import keras
from keras import Sequential
from keras.layers import Dense, Flatten,Conv2D,Dropout,AveragePooling2D
from keras.utils.vis_utils import plot_model
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
# mlops packages
import mlflow
# import mlflow.pyfunc
# %%
# fixed variables, setup seeds and filter warnings messages
ROOT_DIR = '../'
tf.keras.utils.set_random_seed(42)
tf.random.set_seed(42)
random.seed(42)
np.random.seed(42)
warnings.filterwarnings('ignore')
def _fix_df(df):
"""Prepare the Data Frame to be readable
"""
df_new = df.drop(['ID'], axis=0)
df_new.columns = df_new.iloc[0,:]
df_new.drop([np.nan], axis=0, inplace=True)
df_new.columns.name = 'ID'
return df_new
def read_clinical_data(abs_path='../'):
"""Return excel data as pandas Data Frame
"""
df_od = pd.read_excel(abs_path + 'ClinicalData/patient_data_od.xlsx', index_col=[0])
df_os = pd.read_excel(abs_path + 'ClinicalData/patient_data_os.xlsx', index_col=[0])
return _fix_df(df=df_od), _fix_df(df=df_os)
df_od, df_os = read_clinical_data('./')
def create_token_id(df_sample,direction = 'OS'):
# reminder that: OD -> Oculus Dexter(right eye) and OS -> Oculus Sinister (left eye)
df_sample = df_sample.copy()
df_sample = df_sample.reset_index().rename({'index':'token_id'},axis = 1)
df_sample['token_id'] = df_sample['token_id'].apply(lambda x: 'RET' + x.replace('#','').strip() + direction+'.jpg')
df_sample['eye_side'] = direction
return df_sample
df_test_os = create_token_id(df_os,'OS')
df_test_od = create_token_id(df_od,'OD')
df_join_data = pd.concat([df_test_os,df_test_od])
df_join_data.sample(10)
df_join_data.Diagnosis.value_counts(normalize = True)
# based on the test #2 from paper experiments, remove suspicious class (2)
df_join_data['Diagnosis'] = df_join_data['Diagnosis'].replace(2,0)
# def x_and_y_dataset(lower_index,upper_index,eye_class = 'both'):
def x_dataset(df_sample):
df_sample = df_sample.copy()
# if eye_class == 'both':
# df_sample = df_join_data.copy() # in this fuction, df_join_data is a global variable
# else:
# df_sample = df_join_data[df_join_data.eye_side == eye_class]
list_to_tensor,list_not_available = [],[]
# for i, row in df_sample.iloc[lower_index:upper_index,:].iterrows():
for i, row in df_sample.iterrows():
try:
# resize img to 224 x 224 because, RESNET50 was trained with this configuration
resize_sample = resize(np.array(Image.open("FundusImages/" + row['token_id'])), (224, 224))
list_to_tensor.append(resize_sample)
except:
print(f'the sample {row["token_id"]} is not available')
list_not_available.append(row["token_id"])
X_set = np.array(list_to_tensor)
# y_set = df_join_data.iloc[lower_index:upper_index,:].Diagnosis
return X_set
# return X_set,y_set
# plot eyes sample
def visualize_four_samples(X_set):
_, ax = plt.subplots(2, 2, figsize=(10, 10))
index = 0
for i in range(2):
for j in range(2):
cell = ax[i][j]
cell.set_xticks([])
cell.set_yticks([])
cell.grid(False)
cell.imshow(X_set[index], cmap='gray')
index += 1
plt.show()
df_join_data.Diagnosis.value_counts(normalize = True)
df_join_data.Diagnosis.value_counts(normalize = False)
df_healty_sample = df_join_data[df_join_data.Diagnosis == 0].sample(87, random_state = 42)
df_glaucoma = df_join_data[df_join_data.Diagnosis == 1]
df_balanced_set = pd.concat([df_healty_sample, df_glaucoma])
df_balanced_set.head()
df_balanced_set.Diagnosis.value_counts() # balanced dataset
X = df_balanced_set.drop('Diagnosis',axis = 1)
y = df_balanced_set['Diagnosis']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
y_train.mean()
y_test.mean()
X_train_tensor = x_dataset(X_train)
X_test_tensor = x_dataset(X_test)
visualize_four_samples(X_train_tensor)
visualize_four_samples(X_test_tensor)
# ANN - architecture
model_simple = keras.Sequential([
Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(224, 224, 3)),
Conv2D(64, (3, 3), activation='relu', strides=(2, 2)),
Conv2D(128, (3, 3), activation='relu'),
Dropout(0.5),
Flatten(),
# Fully connected layers
Dense(64, activation='relu'),
Dense(64, activation='relu'),
Dense(1, activation='sigmoid')
])
# compile settings
model_simple.compile(optimizer='adam', loss='binary_crossentropy', metrics=[Precision()])
# train model
model_simple.fit(X_train_tensor, y_train, epochs=50)
plot_model(model_simple, show_shapes=True)
model_simple.evaluate(X_train_tensor,y_train)
model_simple.evaluate(X_test_tensor,y_test)
y_pred_prob_ann_cnn = model_simple.predict(X_test_tensor)
y_pred_ann_cnn = (y_pred_prob_ann_cnn.flatten() >0.5)+0
accuracy_score(y_test,y_pred_ann_cnn)
precision_score(y_test,y_pred_ann_cnn)
recall_score(y_test,y_pred_ann_cnn)
print(classification_report(y_test,y_pred_ann_cnn))
confusion_matrix(y_test, y_pred_ann_cnn)
precision_simple, recall_simple, _ = precision_recall_curve(y_test, y_pred_ann_cnn)
plt.plot(recall_simple,precision_simple)
plt.xlabel('recall')
plt.ylabel('precision')
plt.title('Precision vs Recall')
plt.axvline(x = 0.65, linestyle = '--', color = 'r', label = '0.65 recall')
plt.hlines(y = 0.85, xmin = 0, xmax = 1, linestyle = '--',label = '0.85 precision', color = 'g')
plt.legend()
plt.show()
fpr_simple, tpr_simple, _ = roc_curve(y_test.values, y_pred_ann_cnn)
plt.plot(fpr_simple,tpr_simple)
plt.plot([0,1],[0,1], linestyle = '--')
plt.xlabel('False Positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.show()
# lenet architecture
model_lenet = keras.Sequential([
Conv2D(32, (5, 5), activation='relu',padding='same',strides=(1, 1), input_shape=(224, 224, 3)),
# AveragePooling2D(strides = (2,2)),
Conv2D(16, kernel_size = (5, 5), activation='relu', strides=(1, 1)),
AveragePooling2D(strides = (2,2)),
Conv2D(120, kernel_size = (5,5), activation='relu', strides=(1, 1)),
Flatten(),
# Dense(120, activation='relu'),
Dense(84, activation='relu'),
Dense(1, activation='sigmoid') # Capa de salida
])
# transform y label to categorical
# y_train_cat = tf.keras.utils.to_categorical(y_train)
# compile settings
model_lenet.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005), loss='binary_crossentropy', metrics=[Precision()])
# train model
model_lenet.fit(X_train_tensor, y_train, epochs=50)
plot_model(model_lenet, show_shapes=True)
model_lenet.evaluate(X_train_tensor,y_train)
model_lenet.evaluate(X_test_tensor,y_test)
model_lenet.save('model_lenet.h5')
y_pred_prob_lenet = model_lenet.predict(X_test_tensor)
y_pred_lenet = (y_pred_prob_lenet.flatten() >0.5)+0
accuracy_score(y_test,y_pred_lenet)
precision_score(y_test,y_pred_lenet)
recall_score(y_test,y_pred_lenet)
print(classification_report(y_test,y_pred_lenet))
confusion_matrix(y_test, y_pred_lenet)
precision_lenet, recall_lenet, _ = precision_recall_curve(y_test, y_pred_lenet)
plt.plot(recall_lenet,precision_lenet, label = 'lenet arch', color = 'g')
plt.plot(recall_simple,precision_simple, label = 'simple arch', color = 'b')
plt.xlabel('recall')
plt.ylabel('precision')
plt.title('Precision vs Recall')
plt.axvline(x = 0.70, linestyle = '--', color = 'r', label = '0.70 recall')
plt.hlines(y = 0.9, xmin = 0, xmax = 1, linestyle = '--',label = '0.9 precision', color = 'g')
plt.legend()
plt.show()
fpr_lenet, tpr_lenet, _ = roc_curve(y_test.values, y_pred_lenet)
plt.plot(fpr_lenet,tpr_lenet, label = 'lenet arch', color = 'g')
plt.plot(fpr_simple,tpr_simple, label = 'simple arch', color = 'b')
plt.plot([0,1],[0,1], linestyle = '--')
plt.xlabel('False Positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend()
plt.show()
# ann architecture
base_model_resnet50 = ResNet50(weights ='imagenet', include_top = False)
x = base_model_resnet50.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.3)(x)
x = Dense(128, activation='relu')(x)
x = Dense(64, activation='relu')(x)
x = Dense(16, activation='relu')(x)
# x = Dropout(0.2)(x)
preds = Dense(1, activation='sigmoid')(x)
resnet50_model = Model(inputs=base_model_resnet50.input , outputs=preds)
# for layer in base_model_resnet50.layers:
# layer.trainable=False
# compile settings
resnet50_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[Precision()])
# train model
resnet50_model.fit(X_train_tensor, y_train, epochs=30)
resnet50_model.evaluate(X_train_tensor,y_train)
resnet50_model.evaluate(X_test_tensor,y_test)
y_pred_prob_resnet = resnet50_model.predict(X_test_tensor)
# save basic functions for track performace metrics model
def predict_on_test_data(model,X_test):
y_pred = (model.predict(X_test).flatten() >0.5) + 0 # return 0 or 1
return y_pred
def get_metrics(y_true, y_pred):
acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
return {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2)}
def predict_evaluate(model, X_test, y_test):
y_pred = predict_on_test_data(model, X_test)
run_metrics = get_metrics(y_test, y_pred)
return run_metrics
# save experiments
def create_experiment(experiment_name,run_name, run_metrics,model,run_params = None):
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
if not run_params == None:
for param in run_params:
mlflow.log_param(param, run_params[param])
for metric in run_metrics:
mlflow.log_metric(metric, run_metrics[metric])
mlflow.sklearn.log_model(model, "model")
mlflow.set_tag("CNN architecture",run_name)
print('La ejecuciĆ³n: %s fue registrada en el experimento: %s' %(run_name, experiment_name))
def tuned_ann(epochs, learning_rate,activation_f):
# architecture
model = keras.Sequential([
Conv2D(32, (5, 5), activation=activation_f,padding='same',strides=(1, 1), input_shape=(224, 224, 3)),
Conv2D(16, kernel_size = (5, 5), activation=activation_f, strides=(1, 1)),
AveragePooling2D(strides = (2,2)),
Conv2D(120, kernel_size = (5,5), activation=activation_f, strides=(1, 1)),
Flatten(),
Dense(84, activation=activation_f),
Dense(1, activation='sigmoid')
])
# compile settings
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss='binary_crossentropy', metrics=[Precision()])
# train model
model.fit(X_train_tensor, y_train, epochs=epochs)
return model
# create an instance - experiment
experiment_name = "basic_CNN"
run_name = "basic_arch"
# hyperparameters
current_epochs = 50
current_activation = 'relu'
current_learning_rate = 0.001
current_clasifier = tuned_ann(current_epochs,current_learning_rate,current_activation)
run_metrics = predict_evaluate(current_clasifier, X_test_tensor, y_test)
print(run_metrics)
create_experiment(experiment_name,
run_name,
run_metrics,
current_clasifier,
run_params = {"epochs":current_epochs,
"activation_function":current_activation,
"learning_rate":current_learning_rate
}
)