import numpy as np
import pandas as pd
import scipy.linalg as linalg
import rdkit.Chem as Chem
from rdkit.Chem import AllChem
import torch
import matplotlib.pyplot as plt
def generate_graph(smile):
def get_atom_hash(atomic_number):
""" A helper function to quickly encode atomic number into a one-hot vector """
atomic_number_list = [6, 7, 8, 9, 15, 16, 17, 35, 53]
if atomic_number in atomic_number_list:
return atomic_number_list.index(atomic_number)
else:
return len(atomic_number_list)
def encode_atom(atom):
""" Generates the vector of features for an atom in a molecule"""
atom_matrix = np.zeros(10)
atom_matrix[get_atom_hash(atom.GetAtomicNum())] = 1
return atom_matrix
m = AllChem.MolFromSmiles(smile)
m = AllChem.AddHs(m)
AllChem.ComputeGasteigerCharges(m)
atom_matrix = np.array(list(map(encode_atom, (atom for atom in m.GetAtoms()))))
adj_matrix = AllChem.GetAdjacencyMatrix(m) + np.identity(m.GetNumAtoms()) # Augmented Adjacency Matrix
return(atom_matrix, adj_matrix, m.GetNumAtoms())
esol_data = pd.read_csv('delaney-processed.csv', sep=',')
amigdalin = esol_data['smiles'][0]
print(amigdalin)
print(generate_graph(amigdalin)[2])
n_samples = 10
feature_matrices = []
adj_matrices = []
molecule_membership = []
for i in range(n_samples):
feature_matrix, adj_matrix, n_atoms = generate_graph(esol_data['smiles'][i])
feature_matrices.append(feature_matrix)
adj_matrices.append(adj_matrix)
molecule_membership = np.concatenate((molecule_membership, [i]*n_atoms))
big_feature_matrix = np.concatenate(feature_matrices, axis = 0)
print(np.shape(big_feature_matrix))
big_adj_matrix = linalg.block_diag(*adj_matrices)
print(np.shape(big_adj_matrix))
masks = torch.tensor(np.array([np.where(molecule_membership == i, 1, 0) for i in range(n_samples)])).double()
print(np.shape(masks))
degree_vector = np.sum(big_adj_matrix, axis = 0)
degree_vector = np.sqrt(1/degree_vector)
D = torch.tensor(np.diag(degree_vector))
A = torch.tensor(big_adj_matrix)
norm_graph_laplacian = D @ A @ D
class SolNet(torch.nn.Module):
def __init__(self, laplacian, molecule_masks, input_size = 10, internal_size = 10, output_size = 1, num_conv_layers = 3, activation_function=torch.nn.functional.relu):
super().__init__()
#Bring in variables
self.laplacian = laplacian
self.molecule_masks = molecule_masks
self.activation_function = activation_function
#Convolutional weights
self.w0 = torch.nn.Parameter(torch.randn(input_size,internal_size))
self.w1 = torch.nn.Parameter(torch.randn(internal_size,internal_size))
self.w2 = torch.nn.Parameter(torch.randn(internal_size,internal_size))
#Linear weights
self.w3 = torch.nn.Parameter(torch.randn(internal_size,output_size))
def forward(self, x):
#Convolutional layers
x = self.activation_function(self.laplacian @ x @ self.w0)
x = self.activation_function(self.laplacian @ x @ self.w1)
x = self.activation_function(self.laplacian @ x @ self.w2)
#Mask layer
x = self.molecule_masks @ x
#Linear layer
y = x @ self.w3
return y
## Create Targets
targets = torch.tensor(esol_data['measured log solubility in mols per litre'][:n_samples])
# Only train on 2/3 of points
labelled_indices = np.random.choice(np.arange(n_samples), int(2*n_samples/3), replace = False)
unlabelled_indices = np.setdiff1d(np.arange(n_samples), labelled_indices)
### Train your model here (You can use a very similar approach to the one used in previous practicals) ###
model = SolNet(norm_graph_laplacian, masks)
model.double()
num_epochs = 200
learning_rate = 1e-2
opt = torch.optim.Adam(model.parameters(), lr=learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
pred = model(big_feature_matrix).squeeze()
train_loss = torch.mean((pred[labelled_indices] - targets[labelled_indices])**2)
opt.zero_grad()
train_loss.backward()
opt.step()
all_train_loss.append(train_loss)
test_loss = torch.mean((pred[unlabelled_indices] - targets[unlabelled_indices])**2)
all_test_loss.append(test_loss)
plt.plot(all_train_loss,label='Train loss')
plt.plot(all_test_loss,label='Test')
plt.legend()
print(all_train_loss[-1],all_test_loss[-1])
class MPNet(torch.nn.Module):
def __init__(self, n_features, activation_function=torch.nn.functional.relu):
super().__init__()
#Port parameters
self.n_features = n_features
self.activation_function = activation_function
#Set up message passing module
self.w0 = torch.nn.Parameter(torch.randn(n_features*2,n_features))
self.w1 = torch.nn.Parameter(torch.randn(n_features*2,n_features))
#Linear layer
self.w2 = torch.nn.Parameter(torch.randn(n_features,1))
def forward(self, feats, adj):
for i in range(len(feats)):
message = 0
for j in np.where(adj[i] == 1)[0]:
message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.w0)
feats[i] = self.activation_function(torch.hstack((feats[i],message)) @ self.w1) / len(np.where(adj[i] == 1)[0])
y = torch.sum(feats,axis=0) @ self.w2
return y
### Train your model here (You can use a very similar approach to the one used in previous practicals) ###
# Note that training here can be slow if your module is not sufficiently vectorized. It might
# be helpful to reduce n_samples to make sure your architecture is working properly before trying to apply
# to larger data.
model = MPNet(10)
model.double()
num_epochs = 500
learning_rate = 1e-2
opt = torch.optim.Adam(model.parameters(), lr = learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
epoch_loss = 0
for i in labelled_indices:
pred = model(torch.tensor(generate_graph(esol_data['smiles'][i])[0]),torch.tensor(generate_graph(esol_data['smiles'][i])[1]))
loss = torch.mean((pred-targets[i])**2)
epoch_loss += loss
opt.zero_grad()
epoch_loss.backward()
opt.step()
all_train_loss.append(epoch_loss/len(labelled_indices))
epoch_test_loss = 0
for j in unlabelled_indices:
with torch.no_grad():
pred = model(torch.tensor(generate_graph(esol_data['smiles'][j])[0]),torch.tensor(generate_graph(esol_data['smiles'][j])[1]))
loss = torch.mean((pred-targets[j])**2)
epoch_test_loss += loss
all_test_loss.append(epoch_test_loss/len(unlabelled_indices))
if epoch % 25 == 0:
print('epoch = '+str(epoch))
print('train loss = '+str(epoch_loss/len(labelled_indices)))
print('test loss = '+str(epoch_test_loss/len(unlabelled_indices)))
plt.plot(all_train_loss,label='Train')
plt.plot(all_test_loss,label='Test')
plt.legend()
print(all_train_loss[-1], all_test_loss[-1])