import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
import matplotlib.pyplot as plt
glass_data = pd.read_csv("glass.csv")
glass_data
glass_data_x = glass_data.drop(["Type", "RI"], axis = 1)
glass_data_x_normalized =(glass_data_x-glass_data_x.mean())/glass_data_x.std()
glass_data_x_normalized
x_data = torch.tensor(glass_data_x_normalized.to_numpy()).float()
x_data = x_data.unsqueeze(-2)
#Set y to be from 0-6
y_data = torch.tensor(glass_data.iloc[:, 9].to_numpy()).long() - 1
print("X data shape", x_data.shape)
print("Y data shape", y_data.shape)
print("X max is ", x_data.max())
print("X min is ", x_data.min())
print("Y max is ", y_data.max())
print("Y min is ", y_data.min())
class Glass(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv1d(in_channels=1,out_channels=3,kernel_size=3, padding=1)
self.conv2 = nn.Conv1d(in_channels=3,out_channels=3,kernel_size=3, padding=1)
#each channel has its own Kernel and sets of params
#(N_batch, 3, 8) is our output, that we will flatten it to be N_batch*24 to pass it to a linear fct.
self.fc1=nn.Linear(24,7)
#we could have had more linear function whose take as inputs the output of previous
def forward(self, x):
x= F.relu(self.conv1(x))
x= F.relu(self.conv2(x))
x=x.reshape((x.shape[0], -1)) #turn an input of size N_batch*3*8 into size N_batch*24 to
x=self.fc1(x)
#we could do a softmax to have the proba for each class, but CrossEntropyLoss does it for you.
return x
num_examples = x_data.shape[0]
num_train = int(num_examples * .8)
num_test = num_examples - num_train
random_indices = torch.randperm(num_examples)
train_index = random_indices[0:num_train]
test_index = random_indices[num_train:]
g = Glass()
g(x_data)
batch_size = 4
num_epochs= 100
learning_rate = 0.01
opt = SGD(g.parameters(), lr=learning_rate)
num_train_batch = (num_train + batch_size - 1) // batch_size
num_test_batch = (num_test + batch_size - 1) // batch_size
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
train_order = train_index[torch.randperm(num_train)]
epoch_train_loss = 0
g.train()
for batch in range(num_train_batch):
indices = train_order[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[indices]
y_batch = y_data[indices]
pred = g(x_batch)
loss = F.cross_entropy(pred, y_batch)
opt.zero_grad()
loss.backward()
opt.step()
epoch_train_loss += loss.item()
epoch_test_loss = 0
g.eval()
for batch in range(num_test_batch):
test_indices = test_index[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[test_indices]
y_batch = y_data[test_indices]
with torch.no_grad():
pred = g(x_batch)
loss = F.cross_entropy(pred, y_batch)
epoch_test_loss += loss.item()
all_train_loss.append(epoch_train_loss/num_train_batch)
all_test_loss.append(epoch_test_loss/num_test_batch)
plt.plot(all_train_loss, label="Train")
plt.plot(all_test_loss, label="Test")
plt.legend()
#### YOUR CODE HERE ####
#train set first
with torch.no_grad():
pred = g(x_data[train_index])
y_train=y_data[train_index]
predictions = torch.argmax(pred,dim=-1)
train_total_correct = torch.zeros(7)
train_all_classes = torch.zeros(7)
for (true_label, prediction) in zip(y_train, predictions):
if (true_label == prediction):
train_total_correct[true_label] +=1
train_all_classes[true_label] +=1
print("train accuracy: ", train_total_correct/train_all_classes)
#test set first
with torch.no_grad():
pred = g(x_data[test_index])
y_test=y_data[test_index]
predictions = torch.argmax(pred,dim=-1)
test_total_correct = torch.zeros(7)
test_all_classes = torch.zeros(7)
for (true_label, prediction) in zip(y_test, predictions):
if (true_label == prediction):
test_total_correct[true_label] +=1
test_all_classes[true_label] +=1
print("test accuracy: ", test_total_correct/test_all_classes)
def mean_acc_nonan(A):
#we define a function that goes over a tensor of accuracies and copy values <1.5 so all, but nan
#we return the average
output=[]
for i in range(len(A)):
if A[i].item()<1.5:
output.append(A[i].item())
return np.mean(output)
#Let's calculate an average accuracy for all classes for training
train_accuracies=train_total_correct/train_all_classes
train_avg_acc=mean_acc_nonan(train_accuracies)
print ('mean train accuracy: ', train_avg_acc)
#Let's calculate an average accuracy for all classes for test
test_accuracies=test_total_correct/test_all_classes
test_avg_acc=mean_acc_nonan(test_accuracies)
print ('mean test accuracy: ', test_avg_acc)
num_data_points = 40
PI = torch.tensor(np.pi).float()
noise = 0.01
phases = torch.rand((10, 1)) * 2 * PI
print ('phases : ', phases.shape)
t = torch.linspace(0, 2 * PI, num_data_points)
print ('t : ', t.shape)
x = torch.sin(t + phases)
print('x: ', x.shape)
#Add_noise
x = x + torch.randn_like(x) * noise
for (phase_i, x_i) in zip(phases, x):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
class RNNNet(nn.Module):
def __init__(self, input_size=1, hidden_layer_size=100, output_size=1):
super().__init__()
self.rnn1=nn.RNN(input_size, hidden_layer_size, batch_first = True)
self.fc1=nn.Linear(hidden_layer_size,output_size)
#no need to add the non-lineariyt as the rnn module already includes it (with tanh or Relu)
def forward(self, input_seq):
#Rnn has two outputs: the output and all the hidden states.
#Since we don't need the output hidden states, we specify it
rnn_out, _ = self.rnn1(input_seq)
predictions = self.fc1(rnn_out)
return predictions
x_data = x[:, :-1].unsqueeze(-1)
y_data = x[:,1: ].unsqueeze(-1)
num_examples = x_data.shape[0]
num_train = int(num_examples * .8)
num_val = num_examples - num_train
random_indices = torch.randperm(num_examples)
train_index = random_indices[0:num_train]
val_index = random_indices[num_train:]
print(x_data.shape)
print(y_data.shape)
rnn_net=RNNNet()
print (rnn_net(x_data[0:2]).shape)
rnn_net = RNNNet()
batch_size = 4
num_epochs= 50
learning_rate = 0.01
opt = Adam(rnn_net.parameters(), lr=learning_rate)
num_train_batch = (num_train + batch_size - 1) // batch_size
num_val_batch = (num_val + batch_size - 1) // batch_size
all_train_loss = []
all_val_loss = []
for epoch in range(num_epochs):
train_order = train_index[torch.randperm(num_train)]
epoch_train_loss = 0
for batch in range(num_train_batch):
indices = train_order[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[indices]
y_batch = y_data[indices]
pred = rnn_net(x_batch)
loss = ((pred - y_batch)**2).sum((-1, -2)).mean()
opt.zero_grad()
loss.backward()
opt.step()
epoch_train_loss += loss.item()
epoch_val_loss = 0
for batch in range(num_val_batch):
val_indices = val_index[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[val_indices]
y_batch = y_data[val_indices]
with torch.no_grad():
pred = rnn_net(x_batch)
loss = ((pred - y_batch)**2).sum((-1, -2)).mean()
epoch_val_loss += loss.item()
all_train_loss.append(epoch_train_loss/num_train_batch)
all_val_loss.append(epoch_val_loss/num_val_batch)
plt.plot(all_train_loss, label="Train")
plt.plot(all_val_loss, label="Val")
plt.legend()
print('convergence of train loss: ', all_train_loss[-1])
print('convergence of validation loss: ',all_val_loss[-1])
num_test = 5
sequence_known_frac = 0.5
sequence_known_len = int(num_data_points * sequence_known_frac)
sequence_pred_len = num_data_points - sequence_known_len
sequence_to_predict = int(num_data_points * sequence_known_frac)
t_test = torch.linspace(0, 1 * PI, sequence_known_len)
phases_test = torch.rand((num_test, 1)) * 2 * PI
x_test = torch.sin(t_test + phases_test)
#Add_noise
x_test = x_test + torch.randn_like(x_test) * noise
for (phase_i, x_i) in zip(phases_test, x_test):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
x_test = x_test.unsqueeze(-1)
print(x_test.shape)
rnn_net(x_test).shape
#for each xt, we predict x_t+1. x_t+1 will be the last element of the output.
#we thus need to concatenate the output last element to x_test.
#We repeat until we have the whole sequence.
for i in range(sequence_pred_len):
with torch.no_grad():
x_test= torch.cat([x_test,rnn_net(x_test)[:,-1:,:]], dim=1) #if not : it would collapse the dimensions
print ('final shape of x_test', x_test.shape)
for (phase_i, x_i) in zip(phases_test, x_test):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
x_test.shape
#if we run multiple times the prediction cell, we will just keep adding onto x_test:
#its size in dim=1 will keep growing