import numpy as np
import pandas as pd
import scipy.linalg as linalg
import rdkit.Chem as Chem
from rdkit.Chem import AllChem
import torch
import matplotlib.pyplot as plt
def generate_graph(smile):
def get_atom_hash(atomic_number):
""" A helper function to quickly encode atomic number into a one-hot vector """
atomic_number_list = [6, 7, 8, 9, 15, 16, 17, 35, 53]
if atomic_number in atomic_number_list:
return atomic_number_list.index(atomic_number)
else:
return len(atomic_number_list)
def encode_atom(atom):
""" Generates the vector of features for an atom in a molecule"""
atom_matrix = np.zeros(10)
atom_matrix[get_atom_hash(atom.GetAtomicNum())] = 1
return atom_matrix
m = AllChem.MolFromSmiles(smile)
m = AllChem.AddHs(m)
AllChem.ComputeGasteigerCharges(m)
atom_matrix = np.array(list(map(encode_atom, (atom for atom in m.GetAtoms()))))
adj_matrix = AllChem.GetAdjacencyMatrix(m) + np.identity(m.GetNumAtoms()) # Augmented Adjacency Matrix
return(atom_matrix, adj_matrix, m.GetNumAtoms())
esol_data = pd.read_csv('delaney-processed.csv', sep=',')
amigdalin = esol_data['smiles'][0]
print(amigdalin)
print(generate_graph(amigdalin)[2])
OCC3OC(OCC2OC(OC(C#N)c1ccccc1)C(O)C(O)C2O)C(O)C(O)C3O
59
n_samples = 100
feature_matrices = []
adj_matrices = []
molecule_membership = []
for i in range(n_samples):
feature_matrix, adj_matrix, n_atoms = generate_graph(esol_data['smiles'][i])
feature_matrices.append(feature_matrix)
adj_matrices.append(adj_matrix)
molecule_membership = np.concatenate((molecule_membership, [i]*n_atoms))
big_feature_matrix = np.concatenate(feature_matrices, axis = 0)
print('big feature matrix shape: ', np.shape(big_feature_matrix))
big_adj_matrix = linalg.block_diag(*adj_matrices)
print('big adj matrix shape: ', np.shape(big_adj_matrix))
masks = torch.tensor(np.array([np.where(molecule_membership == i, 1, 0) for i in range(n_samples)])).double()
print('shape of mask: ',np.shape(masks))
print (masks)
big feature matrix shape: (2616, 10)
big adj matrix shape: (2616, 2616)
shape of mask: torch.Size([100, 2616])
tensor([[1., 1., 1., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
...,
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 1., 1., 1.]], dtype=torch.float64)
### let's calculate our graph laplacian, that will be used into our hidden layer calculations.
degree_vector = np.sum(big_adj_matrix, axis=1) #axis actually doesn't matter
degree_vector=np.sqrt(1/degree_vector)
#let's generate the D-1/2 in the correct dimension (meaning matrix, not a vector)
D= torch.tensor(np.diag(degree_vector))
A= torch.tensor(big_adj_matrix)
norm_graph_laplacian = D @ A @ D
#sanity check
print (np.shape(norm_graph_laplacian))
class SolNet(torch.nn.Module):
def __init__(self, laplacian, molecule_masks, input_size = 10, internal_size = 10, output_size = 1, num_conv_layers = 3, activation_function=torch.nn.functional.relu):
super().__init__()
#let's define our parameters for our 3GCN layers
self.W0 = torch.nn.Parameter(torch.randn((input_size,internal_size)))
self.W1 = torch.nn.Parameter(torch.randn((internal_size,internal_size)))
self.W2 = torch.nn.Parameter(torch.randn((internal_size,internal_size)))
#internal size can vary to expand or contract the size of the network.
#We need one more set of parameters for our final layer
self.W3= torch.nn.Parameter(torch.randn((internal_size,output_size)))
self.laplacian = laplacian
self.molecule_masks = molecule_masks
self.activation_function = activation_function
def forward(self, x):
"""Computes the prediction of the neural network"""
x=self.activation_function(self.laplacian @ x @ self.W0) #DADwX 1
x=self.activation_function(self.laplacian @ x @ self.W1) #DADwX 2
x=self.activation_function(self.laplacian @ x @ self.W2) #DADwX 3
x = self.molecule_masks @ x #sum-aggregration of atom_level features in each molecules
y = x @ self.W3
return y
torch.Size([2616, 2616])
## Create Targets
targets = torch.tensor(esol_data['measured log solubility in mols per litre'][:n_samples])
# Only train on 2/3 of points
labelled_indices = np.random.choice(np.arange(n_samples), int(2*n_samples/3), replace = False)
unlabelled_indices = np.setdiff1d(np.arange(n_samples), labelled_indices)
### Train your model here (You can use a very similar approach to the one used in previous practicals) ###
model = SolNet(norm_graph_laplacian, masks)
model.double()
num_epochs=80
learning_rate=0.05
opt=torch.optim.Adam(model.parameters(), lr= learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
pred = model(big_feature_matrix).squeeze()
train_loss = torch.mean((pred[labelled_indices]-targets[labelled_indices])**2)
opt.zero_grad()
train_loss.backward()
opt.step()
all_train_loss.append(train_loss)
test_loss=torch.mean((pred[unlabelled_indices]-targets[unlabelled_indices])**2)
all_test_loss.append(test_loss)
plt.plot(all_train_loss, label='train')
plt.plot(all_test_loss, label ='test')
plt.legend()
print ('final train loss: ', all_train_loss[-1].item(), '\n final test loss: ', all_test_loss[-1].item())
final train loss: 0.7310758692543249
final test loss: 2.8486112140378594
class MPNet(torch.nn.Module):
def __init__(self, n_features, activation_function=torch.nn.functional.relu):
super().__init__()
#let's do just one layer. Which doesn't incorporate info from further than 1 graph neighbor
#for each added layer, we gain info for 1 deeper neighbor
#apparently, SOTA says 3layers is optimal
#let's have a MLP that takes features from the neighbor and the node,
#that then aggregates the features (because why not) so size = n_features *2
#and then return an output that has the same size as original
#this is convenient for it to be used as a new input for layers laters
self.W1 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W2 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W3 = torch.nn.Parameter(torch.randn((n_features,1)))
self.n_features = n_features
self.activation_function = activation_function
### Your code here ###
def forward(self, feats, adj):
"""Computes the prediction of the neural network"""
messages = torch.stack([sum([self.activation_function(torch.hstack((feats[i], feats[j])) @ self.W1) for j in np.where(adj[i]==1)[0]]) for i in range(len(feats))])
feats= self.activation_function(torch.hstack((feats,messages)) @ self.W2)
return torch.sum(feats, axis=0) @ self.W3
#We could have done it with a more classic approach and for loop
#but this may be limited by the memory
# def forward(self, feats, adj):
# """Computes the prediction of the neural network"""
# for i in range(len(feats)):
# message=0
# for j in np.where(adj[i]==1)[0]:
# message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.W1)
# feats[i]=self.activation_function(torch.hstack((feats[i],message)) @ self.W2) / len(np.where(adj[i]==1)[0])
# y = torch.sum(feats, axis=0) @ self.W3
# return y
class MPNet2(torch.nn.Module):
def __init__(self, n_features, activation_function=torch.nn.functional.relu):
super().__init__()
#let's do two layers
self.W1 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W2 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W11 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W22 = torch.nn.Parameter(torch.randn((n_features *2, n_features)))
self.W3 = torch.nn.Parameter(torch.randn((n_features,1)))
self.n_features = n_features
self.activation_function = activation_function
def forward(self, feats, adj):
"""Computes the prediction of the neural network"""
for i in range(len(feats)):
message=0
for j in np.where(adj[i]==1)[0]:
message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.W1)
feats[i]=self.activation_function(torch.hstack((feats[i],message)) @ self.W2) / len(np.where(adj[i]==1)[0])
for i in range(len(feats)):
message=0
for j in np.where(adj[i]==1)[0]:
message += self.activation_function(torch.hstack((feats[i],feats[j])) @ self.W11)
feats[i]=self.activation_function(torch.hstack((feats[i],message)) @ self.W22) / len(np.where(adj[i]==1)[0])
y = torch.sum(feats, axis=0) @ self.W3
return y
model = MPNet(10)
model.double()
num_epochs=160
learning_rate=0.06
opt=torch.optim.Adam(model.parameters(), lr= learning_rate)
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
print (epoch)
epoch_loss=0
for i in labelled_indices:
pred=model(torch.tensor(generate_graph(esol_data['smiles'][i])[0]),torch.tensor(generate_graph(esol_data['smiles'][i])[1]))
loss = torch.mean((pred - targets[i])**2)
epoch_loss += loss
opt.zero_grad()
epoch_loss.backward()
opt.step()
all_train_loss.append(epoch_loss / len(labelled_indices))
A=epoch_loss / len(labelled_indices)
# print ('epoch_loss ',A.detach().numpy())
epoch_test_loss = 0
for i in unlabelled_indices:
with torch.no_grad():
pred=model(torch.tensor(generate_graph(esol_data['smiles'][i])[0]),torch.tensor(generate_graph(esol_data['smiles'][i])[1]))
loss = torch.mean((pred - targets[i])**2)
epoch_test_loss += loss
all_test_loss.append(epoch_test_loss / len(unlabelled_indices))
B =epoch_test_loss / len(unlabelled_indices)
# print ('epoch_test_loss ', B.detach().numpy())
plt.plot(all_train_loss, label='train')
plt.plot(all_test_loss, label ='test')
plt.legend()
print ('final train loss: ', all_train_loss[-1].item(), '\n final test loss: ', all_test_loss[-1].item())
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
final train loss: 1.7038710673234858
final test loss: 7.782729852930243