import numpy as np
import pandas as pd
import scipy.linalg as linalg
import rdkit.Chem as Chem
from rdkit.Chem import AllChem
import torch
import matplotlib.pyplot as plt
def generate_graph(smile): #block copolymer code
def get_atom_hash(atomic_number):
""" A helper function to quickly encode atomic number into a one-hot vector """
atomic_number_list = [6, 7, 8, 9, 15, 16, 17, 35, 53]
if atomic_number in atomic_number_list:
return atomic_number_list.index(atomic_number)
else:
return len(atomic_number_list)
def encode_atom(atom):
""" Generates the vector of features for an atom in a molecule"""
atom_matrix = np.zeros(10)
atom_matrix[get_atom_hash(atom.GetAtomicNum())] = 1 #only information we are getting is atomic # in atom feature matrix-- 1 hot encoding
return atom_matrix
m = AllChem.MolFromSmiles(smile)
m = AllChem.AddHs(m)
AllChem.ComputeGasteigerCharges(m)
atom_matrix = np.array(list(map(encode_atom, (atom for atom in m.GetAtoms()))))
adj_matrix = AllChem.GetAdjacencyMatrix(m) + np.identity(m.GetNumAtoms()) # Augmented Adjacency Matrix #can generate adjacency matrix easily from RdKit
return(atom_matrix, adj_matrix, m.GetNumAtoms())
esol_data = pd.read_csv('delaney-processed.csv', sep=',')
amigdalin = esol_data['smiles'][0]
print(amigdalin)
print(generate_graph(amigdalin)[2])
n_samples = 100 #change to 100
feature_matrices = []
adj_matrices = [] #block adj matrix
molecule_membership = [] #keeps track of which atoms respond to molecule 1, 2, etc.
for i in range(n_samples):
feature_matrix, adj_matrix, n_atoms = generate_graph(esol_data['smiles'][i])
feature_matrices.append(feature_matrix)
adj_matrices.append(adj_matrix)
molecule_membership = np.concatenate((molecule_membership, [i]*n_atoms))
big_feature_matrix = np.concatenate(feature_matrices, axis = 0)
print(np.shape(big_feature_matrix))
big_adj_matrix = linalg.block_diag(*adj_matrices)
print(np.shape(big_adj_matrix))
# need to aggregate atom-level features into something for molecule: (num atoms by num features) * mask --> transform into matrix (num_mols x num_features)
# sum only atoms that belong to that particular molecule
masks = torch.tensor(np.array([np.where(molecule_membership == i, 1, 0) for i in range(n_samples)])).double()
print(np.shape(masks))
### Calculate normalized graph laplacian D^-1/2AD^-1/2
degree_vector = np.sum(big_adj_matrix, axis=1)
degree_vector = np.sqrt(1/degree_vector)
D = torch.tensor(np.diag(degree_vector))
A = torch.tensor(big_adj_matrix)
norm_graph_laplacian = D @ A @ D
print(np.shape(norm_graph_laplacian))
#input size is # feature
class SolNet(torch.nn.Module):
def __init__(self, laplacian, molecule_masks, input_size = 10, internal_size = 10, output_size = 1, num_conv_layers = 3, activation_function=torch.nn.functional.relu):
super().__init__()
### Your Code Here ###
#GCN layer h = nonlinearity(D^-1/2AD^-1/2 *h*Weight)
self.W0 = torch.nn.Parameter(torch.randn((input_size, internal_size)))
self.W1 = torch.nn.Parameter(torch.randn((internal_size, internal_size)))
self.W2 = torch.nn.Parameter(torch.randn((internal_size, internal_size)))
self.W3 = torch.nn.Parameter(torch.randn((internal_size, output_size)))
self.laplacian = laplacian
self.molecule_masks = molecule_masks
self.activation_function = activation_function
def forward(self, x):
"""Computes the prediction of the neural network"""
### Your Code Here ###
x = self.activation_function(self.laplacian @ x @ self.W0)
x = self.activation_function(self.laplacian @ x @ self.W1)
x = self.activation_function(self.laplacian @ x @ self.W2)
x = self.molecule_masks @ x #Aggregation of atom level features in each molecule
y = x @ self.W3
return y
## Create Targets
targets = torch.tensor(esol_data['measured log solubility in mols per litre'][:n_samples])
# Only train on 2/3 of points
labelled_indices = np.random.choice(np.arange(n_samples), int(2*n_samples/3), replace = False)
unlabelled_indices = np.setdiff1d(np.arange(n_samples), labelled_indices)
### Train your model here (You can use a very similar approach to the one used in previous practicals) ###
model = SolNet(norm_graph_laplacian, masks)
model.double()
num_epochs=500
learning_rate=1E-2
opt = torch.optim.Adam(model.parameters(), lr=learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
pred = model(big_feature_matrix).squeeze()
train_loss = torch.mean((pred[labelled_indices]-targets[labelled_indices])**2) # sum over last two dimensions and take average over batch
opt.zero_grad()
train_loss.backward()
opt.step()
all_train_loss.append(train_loss)
test_loss = torch.mean((pred[unlabelled_indices]-targets[unlabelled_indices])**2)
all_test_loss.append(test_loss)
plt.plot(all_train_loss, label="Train")
plt.plot(all_test_loss, label="Test")
plt.title('Epoch Loss')
plt.xlabel('Number of Epochs')
plt.ylabel('Loss')
plt.legend()
print(all_train_loss[-1], all_test_loss[-1])
# relies on local information (graph kernel vs whole aggregation graph)
# >3 layers --> >3rd order NN degrades performance
class MPNet(torch.nn.Module):
def __init__(self, n_features, activation_function=torch.nn.functional.relu):
super().__init__()
self.W0 = torch.nn.Parameter(0.1*torch.randn((n_features*2, n_features)))
self.W1 = torch.nn.Parameter(0.1*torch.randn((n_features*2, n_features))) #current vector and the message
self.W2 = torch.nn.Parameter(0.1*torch.randn((n_features, 1))) #linear layer down to output size of 1 to predict solubilities
self.n_features = n_features
self.activation_function = activation_function
def forward(self, feats, adj):
"""Computes the prediction of the neural network"""
messages = torch.stack([sum(self.activation_function(torch.hstack((feats[i], feats[j])) @ self.W0) for j in np.where(adj[i]==1)[0]) for i in range(len(feats))])
feats = self.activation_function(torch.hstack((feats, messages)) @ self.W1)
y = torch.sum(feats, axis=0) @ self.W2
# for i in range(len(feats)):
# message = 0
# for j in np.where(adj[i]==1)[0]:
# message += self.activation_function(torch.hstack((feats[i], feats[i])) @ self.W0)
# feats[i] = self.activation_function(torch.hstack((feats[i]), message)) @ self.W1) / len(np.where(adj[i]==1)[0])
#y = torch.sum(feats, axis=0) @ self.W2
return y
### Train your model here (You can use a very similar approach to the one used in previous practicals) ###
# Note that training here can be slow if your module is not sufficiently vectorized. It might
# be helpful to reduce n_samples to make sure your architecture is working properly before trying to apply
# to larger data.
#loop through labeled indices, at end of epoch
model = MPNet(10)
model.double()
num_epochs=100
learning_rate=1E-2
opt = torch.optim.Adam(model.parameters(), lr=learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
epoch_loss = 0
for i in labelled_indices:
pred = model(torch.tensor(generate_graph(esol_data['smiles'][i])[0]), torch.tensor(generate_graph(esol_data['smiles'][i])[1]))
loss = torch.mean((pred-targets[i])**2)
epoch_loss += loss
opt.zero_grad()
epoch_loss.backward()
opt.step()
all_train_loss.append(epoch_loss/len(labelled_indices))
print(epoch_loss/len(labelled_indices))
epoch_test_loss=0
for j in unlabelled_indices:
with torch.no_grad():
pred = model(torch.tensor(generate_graph(esol_data['smiles'][j])[0]), torch.tensor(generate_graph(esol_data['smiles'][j])[1]))
loss = torch.mean((pred-targets[j])**2)
epoch_test_loss += loss
all_test_loss.append(epoch_test_loss/len(unlabelled_indices))
print(epoch_test_loss/len(unlabelled_indices))
plt.plot(all_train_loss, label="Train")
plt.plot(all_test_loss, label="Test")
plt.title('Epoch Loss')
plt.xlabel('Number of Epochs')
plt.ylabel('Loss')
plt.legend()
print(all_train_loss[-1], all_test_loss[-1])