import numpy as np
import pandas as pd
import scipy.linalg as linalg
import rdkit.Chem as Chem
from rdkit.Chem import AllChem
import torch
import matplotlib.pyplot as plt
def generate_graph(smile):
def get_atom_hash(atomic_number):
""" A helper function to quickly encode atomic number into a one-hot vector """
atomic_number_list = [6, 7, 8, 9, 15, 16, 17, 35, 53]
if atomic_number in atomic_number_list:
return atomic_number_list.index(atomic_number)
else:
return len(atomic_number_list)
def encode_atom(atom):
""" Generates the vector of features for an atom in a molecule"""
atom_matrix = np.zeros(10)
atom_matrix[get_atom_hash(atom.GetAtomicNum())] = 1
return atom_matrix
m = AllChem.MolFromSmiles(smile)
m = AllChem.AddHs(m)
AllChem.ComputeGasteigerCharges(m)
atom_matrix = np.array(list(map(encode_atom, (atom for atom in m.GetAtoms()))))
adj_matrix = AllChem.GetAdjacencyMatrix(m) + np.identity(m.GetNumAtoms()) # Augmented Adjacency Matrix
return(atom_matrix, adj_matrix, m.GetNumAtoms())
esol_data = pd.read_csv('delaney-processed.csv', sep=',')
amigdalin = esol_data['smiles'][0]
print(amigdalin)
print(generate_graph(amigdalin)[2])
n_samples = 100
feature_matrices = []
adj_matrices = []
molecule_membership = []
for i in range(n_samples):
feature_matrix, adj_matrix, n_atoms = generate_graph(esol_data['smiles'][i])
feature_matrices.append(feature_matrix)
adj_matrices.append(adj_matrix)
molecule_membership = np.concatenate((molecule_membership, [i]*n_atoms))
big_feature_matrix = np.concatenate(feature_matrices, axis = 0)
print('big feature matrix shape: ', np.shape(big_feature_matrix))
big_adj_matrix = linalg.block_diag(*adj_matrices)
print('big adj matrix shape: ', np.shape(big_adj_matrix))
masks = torch.tensor(np.array([np.where(molecule_membership == i, 1, 0) for i in range(n_samples)])).double()
print('shape of mask: ',np.shape(masks))
print (masks)
### let's calculate our graph laplacian, that will be used into our hidden layer calculations.
degree_vector = np.sum(big_adj_matrix, axis=1) #axis actually doesn't matter
degree_vector=np.sqrt(1/degree_vector)
#let's generate the D-1/2 in the correct dimension (meaning matrix, not a vector)
D= torch.tensor(np.diag(degree_vector))
A= torch.tensor(big_adj_matrix)
norm_graph_laplacian = D @ A @ D
#sanity check
print (np.shape(norm_graph_laplacian))
class SolNet(torch.nn.Module):
def __init__(self, laplacian, molecule_masks, input_size = 10, internal_size = 10, output_size = 1, num_conv_layers = 3, activation_function=torch.nn.functional.relu):
super().__init__()
#let's define our parameters for our 3GCN layers
self.W0 = torch.nn.Parameter(torch.randn((input_size,internal_size)))
self.W1 = torch.nn.Parameter(torch.randn((internal_size,internal_size)))
self.W2 = torch.nn.Parameter(torch.randn((internal_size,internal_size)))
#internal size can vary to expand or contract the size of the network.
#We need one more set of parameters for our final layer
self.W3= torch.nn.Parameter(torch.randn((internal_size,output_size)))
self.laplacian = laplacian
self.molecule_masks = molecule_masks
self.activation_function = activation_function
def forward(self, x):
"""Computes the prediction of the neural network"""
x=self.activation_function(self.laplacian @ x @ self.W0) #DADwX 1
x=self.activation_function(self.laplacian @ x @ self.W1) #DADwX 2
x=self.activation_function(self.laplacian @ x @ self.W2) #DADwX 3
x = self.molecule_masks @ x #sum-aggregration of atom_level features in each molecules
y = x @ self.W3
return y
## Create Targets
targets = torch.tensor(esol_data['measured log solubility in mols per litre'][:n_samples])
# Only train on 2/3 of points
labelled_indices = np.random.choice(np.arange(n_samples), int(2*n_samples/3), replace = False)
unlabelled_indices = np.setdiff1d(np.arange(n_samples), labelled_indices)
### Train your model here (You can use a very similar approach to the one used in previous practicals) ###
model = SolNet(norm_graph_laplacian, masks)
model.double()
num_epochs=80
learning_rate=0.05
opt=torch.optim.Adam(model.parameters(), lr= learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
pred = model(big_feature_matrix).squeeze()
train_loss = torch.mean((pred[labelled_indices]-targets[labelled_indices])**2)
opt.zero_grad()
train_loss.backward()
opt.step()
all_train_loss.append(train_loss)
test_loss=torch.mean((pred[unlabelled_indices]-targets[unlabelled_indices])**2)
all_test_loss.append(test_loss)
plt.plot(all_train_loss, label='train')
plt.plot(all_test_loss, label ='test')
plt.legend()
print ('final train loss: ', all_train_loss[-1].item(), '\n final test loss: ', all_test_loss[-1].item())
class MPNet(torch.nn.Module):
def __init__(self, n_features, activation_function=torch.nn.functional.relu):
super().__init__()
#let's do just one layer. Which doesn't incorporate info from further than 1 graph neighbor
#for each added layer, we gain info for 1 deeper neighbor
#apparently, SOTA says 3layers is optimal
#let's have a MLP that takes features from the neighbor and the node,
#that then aggregates the features (because why not) so size = n_features *2
#and then return an output that has the same size as original
#this is convenient for it to be used as a new input for layers laters
self.W1 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W2 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W3 = torch.nn.Parameter(torch.randn((n_features,1)))
self.n_features = n_features
self.activation_function = activation_function
### Your code here ###
def forward(self, feats, adj):
"""Computes the prediction of the neural network"""
messages = torch.stack([sum([self.activation_function(torch.hstack((feats[i], feats[j])) @ self.W1) for j in np.where(adj[i]==1)[0]]) for i in range(len(feats))])
feats= self.activation_function(torch.hstack((feats,messages)) @ self.W2)
return torch.sum(feats, axis=0) @ self.W3
#We could have done it with a more classic approach and for loop
#but this may be limited by the memory
# def forward(self, feats, adj):
# """Computes the prediction of the neural network"""
# for i in range(len(feats)):
# message=0
# for j in np.where(adj[i]==1)[0]:
# message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.W1)
# feats[i]=self.activation_function(torch.hstack((feats[i],message)) @ self.W2) / len(np.where(adj[i]==1)[0])
# y = torch.sum(feats, axis=0) @ self.W3
# return y
class MPNet2(torch.nn.Module):
def __init__(self, n_features, activation_function=torch.nn.functional.relu):
super().__init__()
#let's do two layers
self.W1 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W2 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W11 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W22 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W3 = torch.nn.Parameter(torch.randn((n_features,1)))
self.n_features = n_features
self.activation_function = activation_function
def forward(self, feats, adj):
"""Computes the prediction of the neural network"""
for i in range(len(feats)):
message=0
for j in np.where(adj[i]==1)[0]:
message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.W1)
feats[i]=self.activation_function(torch.hstack((feats[i],message)) @ self.W2) / len(np.where(adj[i]==1)[0])
for i in range(len(feats)):
message=0
for j in np.where(adj[i]==1)[0]:
message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.W11)
feats[i]=self.activation_function(torch.hstack((feats[i],message)) @ self.W22) / len(np.where(adj[i]==1)[0])
y = torch.sum(feats, axis=0) @ self.W3
return y
model = MPNet(10)
model.double()
num_epochs=160
learning_rate=0.06
opt=torch.optim.Adam(model.parameters(), lr= learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
print (epoch)
epoch_loss=0
for i in labelled_indices:
pred=model(torch.tensor(generate_graph(esol_data['smiles'][i])[0]),torch.tensor(generate_graph(esol_data['smiles'][i])[1]))
loss = torch.mean((pred - targets[i])**2)
epoch_loss += loss
opt.zero_grad()
epoch_loss.backward()
opt.step()
all_train_loss.append(epoch_loss / len(labelled_indices))
A=epoch_loss / len(labelled_indices)
# print ('epoch_loss ',A.detach().numpy())
epoch_test_loss = 0
for i in unlabelled_indices:
with torch.no_grad():
pred=model(torch.tensor(generate_graph(esol_data['smiles'][i])[0]),torch.tensor(generate_graph(esol_data['smiles'][i])[1]))
loss = torch.mean((pred - targets[i])**2)
epoch_test_loss += loss
all_test_loss.append(epoch_test_loss / len(unlabelled_indices))
B =epoch_test_loss / len(unlabelled_indices)
# print ('epoch_test_loss ', B.detach().numpy())
plt.plot(all_train_loss, label='train')
plt.plot(all_test_loss, label ='test')
plt.legend()
print ('final train loss: ', all_train_loss[-1].item(), '\n final test loss: ', all_test_loss[-1].item())