import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
import matplotlib.pyplot as plt
glass_data = pd.read_csv("glass.csv")
glass_data
#normalize by subtracting mean for column and dividing by standard deviation
glass_data_x = glass_data.drop(["Type", "RI"], axis = 1)
glass_data_x_normalized =(glass_data_x-glass_data_x.mean())/glass_data_x.std()
glass_data_x_normalized
x_data = torch.tensor(glass_data_x_normalized.to_numpy()).float()
x_data = x_data.unsqueeze(-2)
#Set y to be from 0-6
y_data = torch.tensor(glass_data.iloc[:, 9].to_numpy()).long() - 1
print("X data shape", x_data.shape)
print("Y data shape", y_data.shape)
print("X max is ", x_data.max())
print("X min is ", x_data.min())
print("Y max is ", y_data.max())
print("Y min is ", y_data.min())
class Glass(nn.Module):
def __init__(self):
super().__init__()
# 1 column information (256 by 256 image with 3 RGB channels)
#filter of size 7 is weighted average
#pads symmetrically
self.conv1 = torch.nn.Conv1d(in_channels=1, out_channels=3, kernel_size=3, padding=1) #each kernel is its own
self.conv2 = torch.nn.Conv1d(in_channels=3, out_channels=3, kernel_size=3, padding=1)
#(N batch, 3, 8)
# flatten N batch x 24
self.fct1 = nn.Linear(24, 7) #fully connected layer
#self.fct2 = nn.Linear(14, 7)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
# Nbach,3,8 to Nbatch, 24 (keep Nbatch and then put all other axes into 1)
x = x.reshape(x.shape[0], -1)
y = self.fct1(x) #softmax gives probability of output classes already in cross entropy loss
#y = self.fct2(y)
# goal p(y=class | x, theta)
return y
num_examples = x_data.shape[0]
num_train = int(num_examples * .8)
num_test = num_examples - num_train
random_indices = torch.randperm(num_examples)
train_index = random_indices[0:num_train]
test_index = random_indices[num_train:]
g = Glass()
#g(x_data)
batch_size = 2 #### YOUR CODE HERE ####
num_epochs= 4000 #### YOUR CODE HERE ####
learning_rate = 1e-4 #### YOUR CODE HERE ####
opt = SGD(g.parameters(), lr=learning_rate)
num_train_batch = (num_train + batch_size - 1) // batch_size
num_test_batch = (num_test + batch_size - 1) // batch_size
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
train_order = train_index[torch.randperm(num_train)]
epoch_train_loss = 0
g.train()
for batch in range(num_train_batch):
indices = train_order[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[indices]
y_batch = y_data[indices]
pred = g(x_batch)
#Take softmax of output; neg log likelihood difference between those two to max prob in correct class
loss = F.cross_entropy(pred, y_batch) #### torch.nn.CrossEntropyLoss(weight=None, size_average=None, ignore_index=- 100, reduce=None, reduction='mean', label_smoothing=0.0)
opt.zero_grad()
loss.backward()
opt.step()
epoch_train_loss += loss.item()
epoch_test_loss = 0
g.eval()
for batch in range(num_test_batch):
test_indices = test_index[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[test_indices]
y_batch = y_data[test_indices]
with torch.no_grad():
pred = g(x_batch)
loss = F.cross_entropy(pred, y_batch) #### YOUR CODE HERE ####
epoch_test_loss += loss.item()
all_train_loss.append(epoch_train_loss/num_train_batch)
all_test_loss.append(epoch_test_loss/num_test_batch)
plt.title('Epoch Loss')
plt.xlabel('Epoch Number')
plt.ylabel('Cross-Entropy Loss')
plt.plot(all_train_loss, label="Train Losee")
plt.plot(all_test_loss, label="Test Loss")
plt.legend()
#### YOUR CODE HERE ####
# number of hidden layers (add more layers)
#Training set
with torch.no_grad(): #saves memory for forward pass
pred = g(x_data[train_index])
y_train = y_data[train_index]
predictions = torch.argmax(pred, dim=1) #-1
total_correct = torch.zeros(7)
all_classes = torch.zeros(7)
for (true_label, prediction) in zip(y_train, predictions):
if (true_label==prediction):
total_correct[true_label]+=1
all_classes[true_label]+=1
print('Train Accuracy: ', total_correct/all_classes)
#Test set
with torch.no_grad(): #saves memory for forward pass
pred = g(x_data[test_index])
y_test = y_data[test_index]
predictions = torch.argmax(pred, dim=1) #-1
total_correct = torch.zeros(7)
all_classes = torch.zeros(7)
for (true_label, prediction) in zip(y_test, predictions):
if (true_label==prediction):
total_correct[true_label]+=1
all_classes[true_label]+=1
print('Test Accuracy: ', total_correct/all_classes)
num_data_points = 20
PI = torch.tensor(np.pi).float()
noise = 0.01
phases = torch.rand((10, 1)) * 2 * PI
t = torch.linspace(0, 2 * PI, num_data_points)
x = torch.sin(t + phases)
#Add_noise
x = x + torch.randn_like(x) * noise
for (phase_i, x_i) in zip(phases, x):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
class RNNNet(nn.Module):
def __init__(self, input_size=1, hidden_layer_size=100, output_size=1):
super().__init__()
self.rnn1 = nn.RNN(input_size=input_size, hidden_size=hidden_layer_size, batch_first=True)
self.rnn2 = nn.RNN(input_size=hidden_layer_size, hidden_size=hidden_layer_size, batch_first=True)
self.fc1 = nn.Linear(hidden_layer_size,output_size)
def forward(self, input_seq):
rnn_out, _ = self.rnn1(input_seq) #automatically add nonlinearity #throw away final hidden state
rnn_out2, _ = self.rnn2(rnn_out)
y = self.fc1(rnn_out2)
return y
x_data = x[:, :-1].unsqueeze(-1)
y_data = x[:, 1:].unsqueeze(-1)
num_examples = x_data.shape[0]
num_train = int(num_examples * .8)
num_val = num_examples - num_train
random_indices = torch.randperm(num_examples)
train_index = random_indices[0:num_train]
val_index = random_indices[num_train:]
print(x_data.shape)
# length of dataset:19, size 1 input feature
print(y_data.shape)
rnn_net = RNNNet()
batch_size = 2
num_epochs= 1000
learning_rate = 1e-3
opt = Adam(rnn_net.parameters(), lr=learning_rate)
num_train_batch = (num_train + batch_size - 1) // batch_size
num_val_batch = (num_val + batch_size - 1) // batch_size
all_train_loss = []
all_val_loss = []
for epoch in range(num_epochs):
train_order = train_index[torch.randperm(num_train)]
epoch_train_loss = 0
for batch in range(num_train_batch):
indices = train_order[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[indices]
y_batch = y_data[indices]
pred = rnn_net(x_batch)
loss = ((pred-y_batch)**2).sum((-1,-2)).mean() # sum over last two dimensions and take average over batch
opt.zero_grad()
loss.backward()
opt.step()
epoch_train_loss += loss.item()
epoch_val_loss = 0
for batch in range(num_val_batch):
val_indices = val_index[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[val_indices]
y_batch = y_data[val_indices]
with torch.no_grad():
pred = rnn_net(x_batch)
loss = ((pred-y_batch)**2).sum((-1,-2)).mean() #### YOUR CODE HERE ####
epoch_val_loss += loss.item()
all_train_loss.append(epoch_train_loss/num_train_batch)
all_val_loss.append(epoch_val_loss/num_val_batch)
plt.plot(all_train_loss, label="Train")
plt.plot(all_val_loss, label="Val")
plt.title('Epoch Loss')
plt.xlabel('Number of Epochs')
plt.ylabel('Loss')
plt.legend()
num_test = 5
sequence_known_frac = 0.5
sequence_known_len = int(num_data_points * sequence_known_frac)
sequence_pred_len = num_data_points - sequence_known_len
sequence_to_predict = int(num_data_points * sequence_known_frac)
t_test = torch.linspace(0, 1 * PI, sequence_known_len)
phases_test = torch.rand((num_test, 1)) * 2 * PI
x_test = torch.sin(t_test + phases_test)
#Add_noise
x_test = x_test + torch.randn_like(x_test) * noise
for (phase_i, x_i) in zip(phases_test, x_test):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
x_test = x_test.unsqueeze(-1)
print(x_test.shape)
#### YOUR CODE HERE ####
#Training set
for i in range(sequence_pred_len):
with torch.no_grad():
# 5 sine waves, know 10 timesteps
x_test = torch.cat(([x_test, rnn_net(x_test)[:,-1:,:]]), dim=1)
for (phase_i, x_i) in zip(phases_test, x_test):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()