import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
import matplotlib.pyplot as plt
glass_data = pd.read_csv("glass.csv")
glass_data
glass_data_x = glass_data.drop(["Type", "RI"], axis = 1)
glass_data_x_normalized =(glass_data_x-glass_data_x.mean())/glass_data_x.std()
glass_data_x_normalized
x_data = torch.tensor(glass_data_x_normalized.to_numpy()).float()
x_data = x_data.unsqueeze(-2)
#Set y to be from 0-6
y_data = torch.tensor(glass_data.iloc[:, 9].to_numpy()).long() - 1
print("X data shape", x_data.shape)
print("Y data shape", y_data.shape)
print("X max is ", x_data.max())
print("X min is ", x_data.min())
print("Y max is ", y_data.max())
print("Y min is ", y_data.min())
class Glass(nn.Module):
def __init__(self, n_input_features=8, n_output_features=7):
super().__init__()
#############################
#### YOUR CODE HERE ####
n_hidden = 3
self.cnn_1 = nn.Conv1d(in_channels=1, out_channels=n_hidden, kernel_size=3, padding=1)
self.cnn_2 =nn.Conv1d(in_channels=n_hidden, out_channels=1, kernel_size=1, padding=1)
self.linear_layers = nn.Sequential(
nn.Linear(10, n_output_features)
)
self.activation = nn.ReLU()
self.norm = nn.Softmax(1)
#############################
def forward(self, x):
#############################
x = self.cnn_2(self.cnn_1(x))
#print(x.size())
linear_output = self.linear_layers(x)
#print(linear_output.size())
activated_output = self.activation(linear_output).squeeze()
#print(activated_output.size())
return self.norm(activated_output)
#############################
num_examples = x_data.shape[0]
num_train = int(num_examples * .8)
num_test = num_examples - num_train
random_indices = torch.randperm(num_examples)
train_index = random_indices[0:num_train]
test_index = random_indices[num_train:]
g = Glass()
g(x_data)
batch_size = 50 #### YOUR CODE HERE ####
num_epochs= 10000 #### YOUR CODE HERE ####
learning_rate = .005 #### YOUR CODE HERE ####
opt = SGD(g.parameters(), lr=learning_rate)
num_train_batch = (num_train + batch_size - 1) // batch_size
num_test_batch = (num_test + batch_size - 1) // batch_size
loss_fn = nn.CrossEntropyLoss()
all_train_loss = []
all_test_loss = []
for epoch in range(num_epochs):
train_order = train_index[torch.randperm(num_train)]
epoch_train_loss = 0
g.train()
for batch in range(num_train_batch):
indices = train_order[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[indices]
y_batch = y_data[indices]
pred = g(x_batch)
#print(pred.squeeze())
loss = loss_fn(pred.squeeze(), y_batch)
#print("train loss:", loss.item())
opt.zero_grad()
loss.backward()
opt.step()
epoch_train_loss += loss.item()
epoch_test_loss = 0
g.eval()
for batch in range(num_test_batch):
test_indices = test_index[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[test_indices]
y_batch = y_data[test_indices]
with torch.no_grad():
pred = g(x_batch)
loss = loss_fn(pred.squeeze(), y_batch)
epoch_test_loss += loss.item()
all_train_loss.append(epoch_train_loss/num_train_batch)
all_test_loss.append(epoch_test_loss/num_test_batch)
plt.plot(all_train_loss, label="Train")
plt.plot(all_test_loss, label="Test")
plt.legend()
def count_match(target_val, pred_vals):
count=0
for i in pred_vals:
if int(i)==target_val:
count+=1
return count
inx_class_1 = 70 # index at which it starts
inx_class_2 = 146
inx_class_4 = 163
inx_class_5 = 176
inx_class_6 = 185
inx_classes = [inx_class_1, inx_class_2, inx_class_4, inx_class_5, inx_class_6]
# for inx_cutoff in inx_classes:
inx_cutoff=70
for i in range(len(inx_classes)):
inx_cutoff = inx_classes[i]
prev_inx = 0
next_inx = inx_cutoff
class_target = y_data[prev_inx] # this is the first instance of this new class
preds = g(x_data[prev_inx:next_inx])
preds_class = [x.argmax().item() for x in preds]
print("accuracy class %s: %s/%s"%(i, count_match(i, preds_class), len(preds_class)))
num_data_points = 20
PI = torch.tensor(np.pi).float()
noise = 0.01
phases = torch.rand((10, 1)) * 2 * PI
t = torch.linspace(0, 2 * PI, num_data_points)
x = torch.sin(t + phases)
#Add_noise
x = x + torch.randn_like(x) * noise
for (phase_i, x_i) in zip(phases, x):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
class RNNNet(nn.Module):
def __init__(self, input_size=1, hidden_layer_size=100, output_size=1):
super().__init__()
self.layer = nn.RNN(input_size, hidden_layer_size)
def forward(self, input_seq):
return self.layer(input_seq)
x_data = x[:, :-1].unsqueeze(-1)
y_data = x[:, 1:].unsqueeze(-1)
num_examples = x_data.shape[0]
num_train = int(num_examples * .8)
num_val = num_examples - num_train
random_indices = torch.randperm(num_examples)
train_index = random_indices[0:num_train]
val_index = random_indices[num_train:]
print(x_data.shape)
print(y_data.shape)
rnn_net = RNNNet()
batch_size = 50#### YOUR CODE HERE ####
num_epochs= 100#### YOUR CODE HERE ####
learning_rate = 0.01#### YOUR CODE HERE ####
opt = Adam(rnn_net.parameters(), lr=learning_rate)
num_train_batch = (num_train + batch_size - 1) // batch_size
num_val_batch = (num_val + batch_size - 1) // batch_size
all_train_loss = []
all_val_loss = []
loss_fn = nn.MSELoss()
for epoch in range(num_epochs):
train_order = train_index[torch.randperm(num_train)]
epoch_train_loss = 0
for batch in range(num_train_batch):
indices = train_order[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[indices]
y_batch = y_data[indices]
pred = rnn_net(x_batch)
loss = loss_fn(pred[0], y_batch) #### YOUR CODE HERE ####
opt.zero_grad()
loss.backward()
opt.step()
epoch_train_loss += loss.item()
epoch_val_loss = 0
for batch in range(num_val_batch):
val_indices = val_index[batch * batch_size:(batch + 1) * batch_size]
x_batch = x_data[val_indices]
y_batch = y_data[val_indices]
with torch.no_grad():
pred = rnn_net(x_batch)
loss = loss_fn(pred[0], y_batch) #### YOUR CODE HERE ####
epoch_val_loss += loss.item()
all_train_loss.append(epoch_train_loss/num_train_batch)
all_val_loss.append(epoch_val_loss/num_val_batch)
plt.plot(all_train_loss, label="Train")
plt.plot(all_val_loss, label="Val")
plt.legend()
num_test = 5
sequence_known_frac = 0.5
sequence_known_len = int(num_data_points * sequence_known_frac)
sequence_pred_len = num_data_points - sequence_known_len
sequence_to_predict = int(num_data_points * sequence_known_frac)
t_test = torch.linspace(0, 1 * PI, sequence_known_len)
phases_test = torch.rand((num_test, 1)) * 2 * PI
x_test = torch.sin(t_test + phases_test)
#Add_noise
x_test = x_test + torch.randn_like(x_test) * noise
for (phase_i, x_i) in zip(phases_test, x_test):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()
x_test = x_test.unsqueeze(-1)
print(x_test.shape)
rnn_net(x_test)
for (phase_i, x_i) in zip(phases_test, rnn_net(x_test)[0].detach()):
plt.plot(x_i.cpu().numpy(), label="{:.2f}".format(phase_i.item()))
plt.legend()