%matplotlib inline
import os
import gc
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from prettytable import PrettyTable
from IPython.display import Image
from sklearn.preprocessing import LabelEncoder
from keras.models import Model
from keras.regularizers import l2
from keras.constraints import max_norm
from keras.utils import to_categorical
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.callbacks import EarlyStopping
from keras.layers import Input, Dense, Dropout, Flatten, Activation
from keras.layers import Conv1D, Add, MaxPooling1D, BatchNormalization
from keras.layers import Embedding, Bidirectional, CuDNNLSTM, GlobalMaxPooling1D
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)
from google.colab import drive
drive.mount('/content/drive')
data_path = 'drive/My Drive/Case_Study/pfam/random_split/'
print('Available data', os.listdir(data_path))
# https://www.kaggle.com/drewbryant/starter-pfam-seed-random-split
# data is randomly splitted in three folders [train(80%), test(10%), dev(10%)]
# reading and concatinating data for each folder.
def read_data(partition):
data = []
for fn in os.listdir(os.path.join(data_path, partition)):
with open(os.path.join(data_path, partition, fn)) as f:
data.append(pd.read_csv(f, index_col=None))
return pd.concat(data)
# reading all data_partitions
df_train = read_data('train')
df_val = read_data('dev')
df_test = read_data('test')
df_train.head()
df_train.info()
# ex: unaligned sequence
# each character reperesents one of the 24(20 common + 4 uncommon) amino acids in the sequence.
df_train.head(1)['sequence'].values[0]
# Given data size
print('Train size: ', len(df_train))
print('Val size: ', len(df_val))
print('Test size: ', len(df_test))
def calc_unique_cls(train, test, val):
"""
Prints # unique classes in data sets.
"""
train_unq = np.unique(train['family_accession'].values)
val_unq = np.unique(val['family_accession'].values)
test_unq = np.unique(test['family_accession'].values)
print('Number of unique classes in Train: ', len(train_unq))
print('Number of unique classes in Val: ', len(val_unq))
print('Number of unique classes in Test: ', len(test_unq))
# Unique classes in the given dataset : [df_train, df_val and df_test]
calc_unique_cls(df_train, df_test, df_val)
# Length of sequence in train data.
df_train['seq_char_count']= df_train['sequence'].apply(lambda x: len(x))
df_val['seq_char_count']= df_val['sequence'].apply(lambda x: len(x))
df_test['seq_char_count']= df_test['sequence'].apply(lambda x: len(x))
def plot_seq_count(df, data_name):
sns.distplot(df['seq_char_count'].values)
plt.title(f'Sequence char count: {data_name}')
plt.grid(True)
plt.subplot(1, 3, 1)
plot_seq_count(df_train, 'Train')
plt.subplot(1, 3, 2)
plot_seq_count(df_val, 'Val')
plt.subplot(1, 3, 3)
plot_seq_count(df_test, 'Test')
plt.subplots_adjust(right=3.0)
plt.show()
def get_code_freq(df, data_name):
df = df.apply(lambda x: " ".join(x))
codes = []
for i in df: # concatination of all codes
codes.extend(i)
codes_dict= Counter(codes)
codes_dict.pop(' ') # removing white space
print(f'Codes: {data_name}')
print(f'Total unique codes: {len(codes_dict.keys())}')
df = pd.DataFrame({'Code': list(codes_dict.keys()), 'Freq': list(codes_dict.values())})
return df.sort_values('Freq', ascending=False).reset_index()[['Code', 'Freq']]
# train code sequence
train_code_freq = get_code_freq(df_train['sequence'], 'Train')
train_code_freq
# val code sequence
val_code_freq = get_code_freq(df_val['sequence'], 'Val')
val_code_freq
# test code sequence
test_code_freq = get_code_freq(df_test['sequence'], 'Test')
test_code_freq
def plot_code_freq(df, data_name):
plt.title(f'Code frequency: {data_name}')
sns.barplot(x='Code', y='Freq', data=df)
plt.subplot(1, 3, 1)
plot_code_freq(train_code_freq, 'Train')
plt.subplot(1, 3, 2)
plot_code_freq(val_code_freq, 'Val')
plt.subplot(1, 3, 3)
plot_code_freq(test_code_freq, 'Test')
plt.subplots_adjust(right=3.0)
plt.show()
df_train.groupby('family_id').size().sort_values(ascending=False).head(20)
df_val.groupby('family_id').size().sort_values(ascending=False).head(20)
df_test.groupby('family_id').size().sort_values(ascending=False).head(20)
# Considering top 1000 classes based on most observations because of limited computational power.
classes = df_train['family_accession'].value_counts()[:1000].index.tolist()
len(classes)
# Filtering data based on considered 1000 classes.
train_sm = df_train.loc[df_train['family_accession'].isin(classes)].reset_index()
val_sm = df_val.loc[df_val['family_accession'].isin(classes)].reset_index()
test_sm = df_test.loc[df_test['family_accession'].isin(classes)].reset_index()
print('Data size after considering 1000 classes for each data split:')
print('Train size :', len(train_sm))
print('Val size :', len(val_sm))
print('Test size :', len(test_sm))
# No. of unique classes after reducing the data size.
calc_unique_cls(train_sm, test_sm, val_sm)
# https://dmnfarrell.github.io/bioinformatics/mhclearning
# http://www.cryst.bbk.ac.uk/education/AminoAcid/the_twenty.html
# 1 letter code for 20 natural amino acids
codes = ['A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L',
'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'Y']
def create_dict(codes):
char_dict = {}
for index, val in enumerate(codes):
char_dict[val] = index+1
return char_dict
char_dict = create_dict(codes)
print(char_dict)
print("Dict Length:", len(char_dict))
def integer_encoding(data):
"""
- Encodes code sequence to integer values.
- 20 common amino acids are taken into consideration
and rest 4 are categorized as 0.
"""
encode_list = []
for row in data['sequence'].values:
row_encode = []
for code in row:
row_encode.append(char_dict.get(code, 0))
encode_list.append(np.array(row_encode))
return encode_list
train_encode = integer_encoding(train_sm)
val_encode = integer_encoding(val_sm)
test_encode = integer_encoding(test_sm)
from keras.preprocessing.sequence import pad_sequences
# padding sequences
max_length = 100
train_pad = pad_sequences(train_encode, maxlen=max_length, padding='post', truncating='post')
val_pad = pad_sequences(val_encode, maxlen=max_length, padding='post', truncating='post')
test_pad = pad_sequences(test_encode, maxlen=max_length, padding='post', truncating='post')
train_pad.shape, val_pad.shape, test_pad.shape
from keras.utils import to_categorical
# One hot encoding of sequences
train_ohe = to_categorical(train_pad)
val_ohe = to_categorical(val_pad)
test_ohe = to_categorical(test_pad)
train_ohe.shape, test_ohe.shape, test_ohe.shape
# del train_pad, val_pad, test_pad
# del train_encode, val_encode, test_encode
# gc.collect()
# label/integer encoding output variable: (y)
le = LabelEncoder()
y_train_le = le.fit_transform(train_sm['family_accession'])
y_val_le = le.transform(val_sm['family_accession'])
y_test_le = le.transform(test_sm['family_accession'])
y_train_le.shape, y_val_le.shape, y_test_le.shape
print('Total classes: ', len(le.classes_))
# le.classes_
# One hot encoding of outputs
y_train = to_categorical(y_train_le)
y_val = to_categorical(y_val_le)
y_test = to_categorical(y_test_le)
y_train.shape, y_val.shape, y_test.shape
# Utility function: plot model's accuracy and loss
# https://realpython.com/python-keras-text-classification/
plt.style.use('ggplot')
def plot_history(history):
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
x = range(1, len(acc) + 1)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(x, acc, 'b', label='Training acc')
plt.plot(x, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(x, loss, 'b', label='Training loss')
plt.plot(x, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
# Utility function: Display model score(Loss & Accuracy) across all sets.
def display_model_score(model, train, val, test, batch_size):
train_score = model.evaluate(train[0], train[1], batch_size=batch_size, verbose=1)
print('Train loss: ', train_score[0])
print('Train accuracy: ', train_score[1])
print('-'*70)
val_score = model.evaluate(val[0], val[1], batch_size=batch_size, verbose=1)
print('Val loss: ', val_score[0])
print('Val accuracy: ', val_score[1])
print('-'*70)
test_score = model.evaluate(test[0], test[1], batch_size=batch_size, verbose=1)
print('Test loss: ', test_score[0])
print('Test accuracy: ', test_score[1])
x_input = Input(shape=(100,))
emb = Embedding(21, 128, input_length=max_length)(x_input)
bi_rnn = Bidirectional(CuDNNLSTM(64, kernel_regularizer=l2(0.01), recurrent_regularizer=l2(0.01), bias_regularizer=l2(0.01)))(emb)
x = Dropout(0.3)(bi_rnn)
# softmax classifier
x_output = Dense(1000, activation='softmax')(x)
model1 = Model(inputs=x_input, outputs=x_output)
model1.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model1.summary()
# Early Stopping
es = EarlyStopping(monitor='val_loss', patience=3, verbose=1)
history1 = model1.fit(
train_pad, y_train,
epochs=50, batch_size=256,
validation_data=(val_pad, y_val),
callbacks=[es]
)
# saving model weights.
model1.save_weights('drive/My Drive/Case_Study/pfam/model1.h5')
plot_history(history1)
display_model_score(model1,
[train_pad, y_train],
[val_pad, y_val],
[test_pad, y_test],
256)
def residual_block(data, filters, d_rate):
"""
_data: input
_filters: convolution filters
_d_rate: dilation rate
"""
shortcut = data
bn1 = BatchNormalization()(data)
act1 = Activation('relu')(bn1)
conv1 = Conv1D(filters, 1, dilation_rate=d_rate, padding='same', kernel_regularizer=l2(0.001))(act1)
#bottleneck convolution
bn2 = BatchNormalization()(conv1)
act2 = Activation('relu')(bn2)
conv2 = Conv1D(filters, 3, padding='same', kernel_regularizer=l2(0.001))(act2)
#skip connection
x = Add()([conv2, shortcut])
return x
# model
x_input = Input(shape=(100, 21))
#initial conv
conv = Conv1D(128, 1, padding='same')(x_input)
# per-residue representation
res1 = residual_block(conv, 128, 2)
res2 = residual_block(res1, 128, 3)
x = MaxPooling1D(3)(res2)
x = Dropout(0.5)(x)
# softmax classifier
x = Flatten()(x)
x_output = Dense(1000, activation='softmax', kernel_regularizer=l2(0.0001))(x)
model2 = Model(inputs=x_input, outputs=x_output)
model2.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model2.summary()
# Early Stopping
es = EarlyStopping(monitor='val_loss', patience=3, verbose=1)
history2 = model2.fit(
train_ohe, y_train,
epochs=10, batch_size=256,
validation_data=(val_ohe, y_val),
callbacks=[es]
)
# saving model weights.
model2.save_weights('drive/My Drive/Case_Study/pfam/model2.h5')
plot_history(history2)
display_model_score(
model2,
[train_ohe, y_train],
[val_ohe, y_val],
[test_ohe, y_test],
256)
x = PrettyTable()
x.field_names = ['Sr.no', 'Model', 'Train Acc', 'Val Acc','Test Acc']
x.add_row(['1.', 'Bidirectional LSTM', '0.964', '0.957', '0.958'])
x.add_row(['2.', 'ProtCNN', '0.996', '0.988', '0.988'])
print(x)