import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import sys
# Dataset from: https://www.kaggle.com/shivam2503/diamonds
# Labels to be price, and instead of doing a regression, doing a classification of:
# 0: 0 - 999
# 1: 1000 - 2499
# 2: 2500 - 4999
# 3: 5000 - 9999
# 4: 10000 - 14999
# 5: 15000+
df = pd.read_csv('diamonds.csv')
df
# Check for missing data
# All attributes are non-null, plus dataset is reputable on Kaggle
# Analysis of categorical data and description of numerical data shows no abnormalities, possibly
# except for duplicated data, but we cannot be guaranteed that duplicated data is not valid, ie.
# there are 2 very similar diamonds with the same price.
df.info()
df.describe()
# Convert regression task into classification task
prices = df.price
data_pruned = df.copy().drop(columns=['Unnamed: 0', 'price']) # Unnamed: 0 is index of each object
# Array which will eventually become dataframe of labels
categorical_labels = []
price_breakpoints = [999, 2499, 4999, 9999, 14999] # Maximum price to be in that category
for price in prices:
categorical_label = len(price_breakpoints) # If new category isn't found, then it is in the last (most expensive) category
for i in range(len(price_breakpoints)):
if price <= price_breakpoints[i]:
categorical_label = i
break
categorical_labels.append(categorical_label)
# Convert to dataframe
labels = pd.DataFrame(data={'label': categorical_labels})
print(labels.describe()) # Make sure the categorical breaks are decent
data_pruned.describe() # Get ready to one-hot encode categorical data
# One hot encode categorical data, since we don't enforce a linear progression from "Good" to "Very Good" to "Ideal" etc.
data_cut_one_hot = pd.get_dummies(data_pruned.cut, prefix='cut')
data_color_one_hot = pd.get_dummies(data_pruned.color, prefix='color')
data_clarity_one_hot = pd.get_dummies(data_pruned.clarity, prefix='clarity')
data_one_hot = pd.concat([data_pruned.copy(), data_cut_one_hot, data_color_one_hot, data_clarity_one_hot], axis=1).drop(['cut', 'color', 'clarity'], axis=1)
data_one_hot.describe()
# Scale numeric data
# Even though they will never be used, these scalers will scale new measurements for the trained model
carat_scaler = StandardScaler().fit(data_one_hot.carat.to_numpy().reshape(-1, 1))
depth_scaler = StandardScaler().fit(data_one_hot.depth.to_numpy().reshape(-1, 1))
table_scaler = StandardScaler().fit(data_one_hot.table.to_numpy().reshape(-1, 1))
x_scaler = StandardScaler().fit(data_one_hot.x.to_numpy().reshape(-1, 1))
y_scaler = StandardScaler().fit(data_one_hot.y.to_numpy().reshape(-1, 1))
z_scaler = StandardScaler().fit(data_one_hot.z.to_numpy().reshape(-1, 1))
scaled_carat = carat_scaler.transform(data_one_hot.carat.to_numpy().reshape(-1, 1)).flatten()
scaled_depth = depth_scaler.transform(data_one_hot.depth.to_numpy().reshape(-1, 1)).flatten()
scaled_table = table_scaler.transform(data_one_hot.table.to_numpy().reshape(-1, 1)).flatten()
scaled_x = x_scaler.transform(data_one_hot.x.to_numpy().reshape(-1, 1)).flatten()
scaled_y = y_scaler.transform(data_one_hot.y.to_numpy().reshape(-1, 1)).flatten()
scaled_z = z_scaler.transform(data_one_hot.z.to_numpy().reshape(-1, 1)).flatten()
data = data_one_hot.copy()
data.carat = scaled_carat
data.depth = scaled_depth
data.table = scaled_table
data.x = scaled_x
data.y = scaled_y
data.z = scaled_z
data.describe()
'''
For our dataset, an 80-20 split does work well, since our dataset of roughly 55,000 instances is
large enough to almost certainly have a testing set similar to the training dataset without
having to worry about outliers playing a significant role in the training or testing sets. While
we do have a relatively small amount of premium cut diamonds, there should be enough of those
samples to not significantly skew our data.
55,000 instances is too many for the free environment we are running on, so we are going to
reduce our dataset by 90%, though this reduction in size does not affect our previous statement.
'''
X_train, X_test, y_train, y_test = train_test_split(data.to_numpy(), labels.to_numpy(),
train_size=0.08, test_size=0.02)
from scipy.special import expit
import enum
class Optimizer(enum.Enum):
STEEPEST_ASCENT = 1
STOCHASTIC_GRADIENT_ASCENT = 2
NEWTONIAN = 3
class Regularization(enum.Enum):
NONE = 1
L1 = 2
L2 = 3
ELASTICNET = 4
class BinaryLogisticRegressionBase:
# private:
def __init__(self, eta, iterations=20, optimizer=Optimizer.STEEPEST_ASCENT):
self.eta = eta
self.iters = iterations
self.optimizer = optimizer
# internally we will store the weights as self.w_ to keep with sklearn conventions
def __str__(self):
return 'Base Binary Logistic Regression Object, Not Trainable'
# convenience, private and static:
@staticmethod
def _sigmoid(theta):
return 1/(1+np.exp(-theta))
@staticmethod
def _add_bias(X):
return np.hstack((np.ones((X.shape[0],1)),X)) # add bias term
# public:
def predict_proba(self, X, add_bias=True):
# add bias term if requested
Xb = self._add_bias(X) if add_bias else X
return self._sigmoid(Xb @ self.w_) # return the probability y=1
def predict(self,X):
return (self.predict_proba(X)>0.5) #return the actual prediction
class BinaryLogisticRegression(BinaryLogisticRegressionBase):
#private:
def __str__(self):
if(hasattr(self,'w_')):
return 'Binary Logistic Regression Object with coefficients:\n'+ str(self.w_) # is we have trained the object
else:
return 'Untrained Binary Logistic Regression Object'
def _get_gradient(self,X,y):
# programming \sum_i (yi-g(xi))xi
gradient = np.zeros(self.w_.shape) # set gradient to zero
for (xi,yi) in zip(X,y):
# the actual update inside of sum
gradi = (yi - self.predict_proba(xi,add_bias=False))*xi
# reshape to be column vector and add to gradient
gradient += gradi.reshape(self.w_.shape)
return gradient/float(len(y))
# public:
def fit(self, X, y):
Xb = self._add_bias(X) # add bias term
num_samples, num_features = Xb.shape
self.w_ = np.zeros((num_features,1)) # init weight vector to zeros
print_progress = 250
# for as many as the max iterations
for i in range(self.iters):
if((i+1) % print_progress == 0):
sys.stderr.write('\rEpoch: %d/%d' % (i+1, self.iters))
sys.stderr.flush()
gradient = self._get_gradient(Xb,y)
self.w_ += gradient*self.eta # multiply by learning rate
from numpy.linalg import pinv
class VectorBinaryLogisticRegression(BinaryLogisticRegression):
# inherit from our previous class to get same functionality
@staticmethod
def _sigmoid(theta):
# increase stability, redefine sigmoid operation
return expit(theta) #1/(1+np.exp(-theta))
# but overwrite the gradient calculation
def _get_gradient(self,X,y):
# Gradient Ascent
if self.optimizer is Optimizer.STEEPEST_ASCENT:
ydiff = y-self.predict_proba(X, add_bias=False).ravel() # get y difference
gradient = np.mean(X * ydiff[:,np.newaxis], axis=0) # make ydiff a column vector and multiply through
gradient = gradient.reshape(self.w_.shape)
return gradient
# SGA
elif self.optimizer is Optimizer.STOCHASTIC_GRADIENT_ASCENT:
idx = int(np.random.rand()*len(y)) # grab random instance
ydiff = y[idx]-self.predict_proba(X[idx],add_bias=False) # get y difference (now scalar)
gradient = X[idx] * ydiff[:,np.newaxis] # make ydiff a column vector and multiply through
gradient = gradient.reshape(self.w_.shape)
return gradient
# Newtonian Method
elif self.optimizer is Optimizer.NEWTONIAN:
g = self.predict_proba(X,add_bias=False).ravel() # get sigmoid value for all classes
hessian = X.T @ np.diag(g*(1-g)) @ X - 2 * self.C # calculate the hessian
ydiff = y-g # get y difference
gradient = np.sum(X * ydiff[:,np.newaxis], axis=0) # make ydiff a column vector and multiply through
gradient = gradient.reshape(self.w_.shape)
gradient[1:] += -2 * self.w_[1:] * self.C
return pinv(hessian) @ gradient
### Existing code - Steepest Ascent
else:
ydiff = y-self.predict_proba(X,add_bias=False).ravel() # get y difference
gradient = np.mean(X * ydiff[:,np.newaxis], axis=0) # make ydiff a column vector and multiply through
return gradient.reshape(self.w_.shape)
class LogisticRegression:
def __init__(self, eta=0.1, optimizer=Optimizer.STEEPEST_ASCENT, iterations=20):
self.eta = eta
self.iters = iterations
self.optimizer = optimizer
# internally we will store the weights as self.w_ to keep with sklearn conventions
def __str__(self):
if(hasattr(self,'w_')):
return 'MultiClass Logistic Regression Object with coefficients:\n'+ str(self.w_) # is we have trained the object
else:
return 'Untrained MultiClass Logistic Regression Object'
def fit(self,X,y):
num_samples, num_features = X.shape
self.unique_ = np.unique(y) # get each unique class value
num_unique_classes = len(self.unique_)
self.classifiers_ = [] # will fill this array with binary classifiers
for i,yval in enumerate(self.unique_): # for each unique value
y_binary = (y==yval) # create a binary problem
# train the binary classifier for this class
blr = VectorBinaryLogisticRegression(self.eta,
self.iters, self.optimizer)
blr.fit(X,y_binary)
# add the trained classifier to the list
self.classifiers_.append(blr)
# save all the weights into one matrix, separate column for each class
self.w_ = np.hstack([x.w_ for x in self.classifiers_]).T
def predict_proba(self,X):
probs = []
for blr in self.classifiers_:
probs.append(blr.predict_proba(X)) # get probability for each classifier
return np.hstack(probs) # make into single matrix
def predict(self,X):
return self.unique_[np.argmax(self.predict_proba(X),axis=1)] # take argmax along row
class RegularizedBinaryLogisticRegression(VectorBinaryLogisticRegression):
# extend init functions
def __init__(self, C=0.0,regularization="l2", **kwds):
# need to add to the original initializer
self.C = C
self.regularization = regularization
# but keep other keywords
super().__init__(**kwds) # call parent initializer
# extend previous class to change functionality
def _get_gradient(self,X,y):
# call get gradient from previous class
gradient = super()._get_gradient(X,y)
# add in regularization (to all except bias term)
# if using L2 regularization
# + eta * sum(weights^2)
if self.regularization is "l2":
gradient[1:] += -2 * self.w_[1:] * self.C
# if using L1 regularization
# + eta * sum(abs(weights))
elif self.regularization is "l1":
# gradient[1:] += -1 * np.linalg.norm(self.w_[1:]) * self.C
gradient[1:] += -1 * self.w_[1:] * self.C
# if using elasticnet regularization (both l1 and l2 regulatization)
# + eta * sum(abs(weights^2))
elif self.regularization is "elasticnet":
gradient[1:] += -2 * self.w_[1:] * self.C
gradient[1:] += -1 * self.w_[1:] * self.C
return gradient
# now redefine the Logistic Regression Function where needed
class RegularizedLogisticRegression(LogisticRegression):
"""
Logistic Regession clasifer implemented for Lab 4
"""
def __init__(self, C=0.0, optimizer=Optimizer.STEEPEST_ASCENT, regularization=Regularization.NONE, eta=0.1, **kwds):
# need to add to the original initializer
self.C = C
self.regularization = regularization
# self.optimizer = optimizer
# but keep other keywords
super().__init__(eta=eta, optimizer=optimizer,**kwds) # call parent initializer
def get_params(self, deep=False):
return {"C": self.C, "regularization": self.regularization, "eta": self.eta, "optimizer": self.optimizer}
def set_params(self, **params):
for key, value in params.items():
setattr(self, key, value)
return self
def fit(self,X,y):
num_samples, num_features = X.shape
self.unique_ = np.unique(y) # get each unique class value
num_unique_classes = len(self.unique_)
self.classifiers_ = [] # will fill this array with binary classifiers
for i,yval in enumerate(self.unique_): # for each unique value
y_binary = y==yval # create a binary problem
# train the binary classifier for this class
# now this has regularization built into it
sys.stderr.write('\rTraining class: %d/%d' % (i+1, len(self.unique_)))
sys.stderr.flush()
###NEW CODE HERE###
# check the regularization method
if self.regularization is Regularization.L2 or self.regularization is Regularization.L1:
reg_method = "l2" if self.regularization is Regularization.L2 else "l1"
blr = RegularizedBinaryLogisticRegression(C=self.C, regularization=reg_method, eta=self.eta, iterations=self.iters)
blr.fit(X,y_binary)
# add the trained classifier to the list
self.classifiers_.append(blr)
if self.regularization is Regularization.ELASTICNET:
elr = RegularizedBinaryLogisticRegression(C=self.C, regularization="elasticnet", eta=self.eta, iterations=self.iters)
elr.fit(X,y_binary)
self.classifiers_.append(elr)
elif self.regularization is Regularization.NONE:
ulr = BinaryLogisticRegression(eta=self.eta, iterations=self.iters)
ulr.fit(X, y_binary)
self.classifiers_.append(ulr)
# save all the weights into one matrix, separate column for each class
self.w_ = np.hstack([x.w_ for x in self.classifiers_]).T
# Do data processing
y_train = np.ndarray.flatten(y_train)
y_test = np.ndarray.flatten(y_test)
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report, confusion_matrix
parameters = {'C': [0.1,0.01,0.0001], 'eta': [0.01,0.1,1], 'regularization': [Regularization.ELASTICNET, Regularization.L2, Regularization.L1], "iterations": [500, 750], "optimizer": [Optimizer.STOCHASTIC_GRADIENT_ASCENT, Optimizer.STEEPEST_ASCENT, Optimizer.NEWTONIAN]}
rlr = RegularizedLogisticRegression()
clf = GridSearchCV(rlr, parameters, scoring="accuracy", verbose=3, refit=True)
clf.fit(X_train, y_train)
# sorted(clf.cv_results_.keys())
# predictions = clf.predict(X_test)
# # print(classification_report(y_test, predictions))
print("\nBest Params:", clf.best_params_)
#print("Best Accuracy:", accuracy_score(y_test, predictions))
# Using several tests manually and the grid search, we conclude that a high eta and a low C are very good hyperparameters when used with SGA and L@ regularization
# Trends showed increasing eta and lowering C improved performance
# SGA performed very well on average and was not too much better than newtons method or SA but it was what we percieved as best
# Most of the penalizers were even but L2 takes the cake by a small margin
# Our best Regression
lr_clf = RegularizedLogisticRegression(C=0.0001,
optimizer=Optimizer.STOCHASTIC_GRADIENT_ASCENT,
regularization=Regularization.L2,
iterations=2500,
eta=0.9)
%time lr_clf.fit(X_train, y_train)
pred = lr_clf.predict(X_test)
from sklearn.metrics import accuracy_score
score = accuracy_score(y_test, pred)
print("\nOur Accuracy Score:", score)
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import SGDClassifier
C = 0.0001
iters = 2500
eta = 0.9
# SKLearn's logistic regression implementation does not use stochastic gradient descent and instead typically uses stochastic average gradient
# so to compare to our best performing model we cannot use LogisticRegression and instead need to use SGDClassifier which implements Stochastic Gradient Descent
# lr = LogisticRegression(penalty="l2", C=C, max_iter=iters, solver="saga")
lr = SGDClassifier(penalty="l2", eta0=eta, learning_rate="constant", max_iter=iters, alpha=C)
%time lr.fit(X_train, y_train)
preds_sk = lr.predict(X_test)
sc = accuracy_score(y_test, preds_sk)
print("\nSK Accuracy Score:", sc)
# Looking at these results, our model does perform better but the training time is a massive difference. Waiting seconds for a relatively small dataset
# when compared to a few milliseconds shows the level of optimization that SKLearn has. Since this would be a deployed, pre-trained model, we would
# suggest our model be deployed instead of sklearn's because of its significantly higher accuracy. Training time would not be a factor.
class OvORegularizedLogisticRegression(LogisticRegression):
"""
One vs One Logistic Regession clasifer implemented for Lab 4 - Exceptional Work
"""
def __init__(self, C=0.0, optimizer=Optimizer.STEEPEST_ASCENT, regularization=Regularization.NONE, eta=0.1, **kwds):
# need to add to the original initializer
self.C = C
self.regularization = regularization
# self.optimizer = optimizer
# but keep other keywords
super().__init__(eta=eta, optimizer=optimizer,**kwds) # call parent initializer
def get_params(self, deep=False):
return {"C": self.C, "regularization": self.regularization, "eta": self.eta, "optimizer": self.optimizer}
def set_params(self, **params):
for key, value in params.items():
setattr(self, key, value)
return self
def fit(self,X,y):
num_samples, num_features = X.shape
self.unique_ = np.unique(y) # get each unique class value
num_unique_classes = len(self.unique_)
self.classifiers_ = [] # will fill this array with binary classifiers
self.cls_labels = []
tables = []
df = pd.DataFrame(data=X)
df['target'] = y
for class_label in self.unique_:
new_df = df.loc[df['target'] == class_label]
new_df = new_df.drop(['target'], axis=1)
tables.append(new_df)
for i,yval1 in enumerate(self.unique_):
for j, yval2 in enumerate(self.unique_):
if j <= i:
continue
labels = (yval1, yval2)
self.cls_labels.append(labels)
X_binary = np.concatenate((tables[yval1].to_numpy(), tables[yval2].to_numpy()))
lab_a = [0] * len(tables[yval1])
lab_b = [1] * len(tables[yval2])
y_binary = np.concatenate((lab_a, lab_b))
sys.stderr.flush()
# check the regularization method
if self.regularization is Regularization.L2 or self.regularization is Regularization.L1:
reg_method = "l2" if self.regularization is Regularization.L2 else "l1"
blr = RegularizedBinaryLogisticRegression(C=self.C, regularization=reg_method, eta=self.eta, iterations=self.iters)
blr.fit(X_binary,y_binary)
self.classifiers_.append(blr)
elif self.regularization is Regularization.ELASTICNET:
elr = RegularizedBinaryLogisticRegression(C=self.C, regularization="elasticnet", eta=self.eta, iterations=self.iters)
elr.fit(X_binary,y_binary)
self.classifiers_.append(elr)
elif self.regularization is Regularization.NONE:
ulr = BinaryLogisticRegression(eta=self.eta, iterations=self.iters)
ulr.fit(X_binary, y_binary)
self.classifiers_.append(ulr)
# save all the weights into one matrix, separate column for each class
self.w_ = np.hstack([x.w_ for x in self.classifiers_]).T
def predict_proba(self,X):
probs = []
for blr in self.classifiers_:
probs.append(blr.predict_proba(X)) # get probability for each classifier
return np.hstack(probs) # make into single matrix
def predict(self,X):
probs = self.predict_proba(X)
scores = [0] * len(self.unique_)
results = []
# Iterate through probabilities
for prob_cls in probs:
# Get the scores for each classifier
scores = [0] * len(self.unique_)
# Iterate through the scores
for j in range(len(prob_cls)):
# Get the labels for this classifier
label_a, label_b = self.cls_labels[j]
if(prob_cls[j] > .5):
scores[label_b] += prob_cls[j]
else:
scores[label_a] += prob_cls[j]
highest_index = np.argmax(scores)
results.append(self.unique_[highest_index])
return results
olr = OvORegularizedLogisticRegression(C=0.0001,
optimizer=Optimizer.STOCHASTIC_GRADIENT_ASCENT,
regularization=Regularization.L2,
eta=0.9,
iterations=2500)
olr.fit(X_train, y_train)
ovopred = olr.predict(X_test)
score = accuracy_score(y_test, ovopred)
print("\nOne vs One Accuracy Score:", score)