from jupyterthemes.stylefx import set_nb_theme
set_nb_theme('gruvboxd')
class check_pearson_corr():
def __init__(self, data, column1: str, column2: str):
self.data = data
self.column1 = column1
self.column2 = column2
self.data_exp = self.data.copy()
self.cols = [self.column1, self.column2]
assert(self.column1 in self.data.columns and self.column2 in self.data.columns)
assert(self.data[self.column1].dtype!='O' and self.data[self.column2].dtype!='O')
def normality_visual(self):
print('Checking for Gaussian distribution:\n')
for column in self.cols:
fig = qqplot(self.data[column], line = '45', fit=True)
ax = plt.gca()
fig.set_size_inches(15, 8)
ax.set_xlabel('Theoretical Quantiles', fontsize=13)
ax.set_ylabel(f'Sample Quantiles of the {column} column', fontsize=13)
plt.show()
def normality_test(self):
print('Shapiro-Wilk test for normality:\n')
for column in self.cols:
print(f'''P-value for {column} column: {shapiro(self.data[column])[1]}\n''')
def outlier_sensitivity(self):
print('Checking outlier sensitivity:\n')
for column in self.cols:
find_outlier_records(self.data, column_name=column)
def linearity_corr(self):
print('Checking for Linearity:\n')
sns.regplot(x = self.column1, y = self.column2, data=self.data, color='b')
plt.show()
def pearson_corr_coef(self):
print(f'Pearson correlation coefficient without outlier handling: {self.data[self.column1].corr(self.data[self.column2])}')
for column in self.cols:
coerce_outliers_zscore(self.data_exp, column_name=column)
print(f'Pearson correlation coefficient with outlier handling: {self.data_exp[self.column1].corr(self.data_exp[self.column2])}')
class check_anova_corr():
def __init__(self, data, cat_col: str, con_col: str):
self.data = data
self.cat_col = cat_col
self.con_col = con_col
assert(self.cat_col in self.data.columns and self.con_col in self.data.columns)
assert(self.data[self.cat_col].dtype=='O' and self.data[self.con_col].dtype!='O')
def check_mean_cat(self):
mean_cat = self.data.groupby(self.cat_col)[self.con_col].agg(['count', 'mean'])
return mean_cat
def compare_mean_visual(self):
plt.figure(figsize=(15,8))
sns.set_palette("Reds", 4)
sns.boxplot(x=self.cat_col, y=self.con_col, data=self.data)
sns.stripplot(x=self.cat_col, y=self.con_col, data=self.data, jitter=0.4, color="0.3")
plt.xlabel("")
plt.show()
def check_gaussian_visual(self):
for cat in self.data[self.cat_col].unique():
fig = qqplot(self.data[self.data[self.cat_col]==cat][self.con_col], line = '45', fit=True)
ax = plt.gca()
fig.set_size_inches(15, 8)
ax.set_xlabel('Theoretical Quantiles', fontsize=13)
ax.set_ylabel(f'Sample Quantiles of the {cat} category', fontsize=13)
ax.set_title("QQ Plot of Categories", fontsize=16)
plt.show()
def check_gaussian_stat(self):
for cat in self.data[self.cat_col].unique():
print(f'''P-value for {cat} category: {shapiro(self.data[self.data[self.cat_col]==cat][self.con_col])}''')
def check_residual_sum_normality(self):
st = ols("self.con_col ~ C(self.cat_col)", data = self.data).fit()
residuals = st.resid
fig = qqplot(residuals, line = '45', fit=True)
ax = plt.gca()
fig.set_size_inches(15, 8)
ax.set_xlabel("Theoretical Quantiles", fontsize=13)
ax.set_ylabel("Sample Quantiles", fontsize=13)
ax.set_title("QQPlot of the Residuals", fontsize=16)
plt.show()
def check_equal_variances(self):
self.data.groupby(self.cat_col)[self.cat_col].describe()['std'].to_frame()
self.homoscedasticity_test = levene(data[data[cat_col]=='Orthodox']['SLpM'], df[df['stance']=='Southpaw']['SLpM'],
df[df['stance']=='Switch']['SLpM'])
print(f'''Levene's test p-value: {self.homoscedasticity_test[1]}''')
def anova_test(self):
lm = ols('self.con_col ~ C(self.cat_col)', data=self.data).fit()
table = anova_lm(lm)
mc = pairwise_tukeyhsd(self.data[self.con_col], self.data[self.cat_col])
result = mc._results_table
return (table, result, mc.groupsunique)
def welch_test(self):
games_howell = pg.pairwise_gameshowell(dv=self.con_col, between=self.cat_col, data=self.data)
return (games_howell)
def conduct_anova_or_welch(self, p_value=0.05):
self.p_value = p_value
check_equal_variances()
if self.homoscedasticity_test[1] >= p_value:
anova_test()
else:
welch_test()
class handle_vif():
def __init__(self, data):
self.data = data
def compute_vif(self, considered_features: list):
self.considered_features = considered_features
X = self.data[self.considered_features]
X['intercept'] = 1
self.vif = pd.DataFrame()
self.vif["Variable"] = X.columns
self.vif["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
self.vif = self.vif[self.vif['Variable']!='intercept']#.sort_values(by='VIF', ascending=False)
return self.vif
def drop_high_vif(self):
self.vif_table = self.vif.sort_values(by='VIF', ascending=False).reset_index(drop=True)
while self.vif_table['VIF'].iloc[0]>5:
self.data.drop(self.vif_table['Variable'][0], axis=1, inplace=True)
computed = self.compute_vif([col for col in df.columns if col!='BodyFat'])
self.vif_table = computed.sort_values(by='VIF', ascending=False).reset_index(drop=True)
return self.data.head()
def run_all_funcs(class_name):
attrs = (getattr(class_name, func) for func in dir(class_name) if callable(getattr(class_name, func)))
methods = filter(inspect.ismethod, attrs)
for method in methods:
try:
method()
except TypeError:
pass
class DummyTransformer(base.BaseEstimator, base.TransformerMixin):
def __init__(self):
return None
def fit(self, X=None, y=None):
return self
def transform(self, X=None):
return self
class KFoldTargetEncoderTrain(DummyTransformer):
def __init__(self,colnames: str, targetName: str,
n_fold=5, verbosity=False,
discardOriginal_col=False):
self.colnames = colnames
self.targetName = targetName
self.n_fold = n_fold
self.verbosity = verbosity
self.discardOriginal_col = discardOriginal_col
def fit(self, X, y=None):
return self
def transform(self,X):
assert(self.colnames in X.columns)
assert(self.targetName in X.columns)
mean_of_target = X[self.targetName].mean()
kf = KFold(n_splits = self.n_fold,
shuffle = True)
col_mean_name = self.colnames + '_' + 'Kfold_Target_Enc'
X[col_mean_name] = np.nan
for tr_ind, val_ind in kf.split(X):
X_tr, X_val = X.iloc[tr_ind], X.iloc[val_ind]
X.loc[X.index[val_ind], col_mean_name] = X_val[self.colnames].map(X_tr.groupby(self.colnames)[self.targetName].mean())
X[col_mean_name].fillna(mean_of_target, inplace = True) #Fill in the place that has become nan with the global mean
if self.verbosity:
encoded_feature = X[col_mean_name].values
print('Correlation between the new feature, {} and, {} is {}.'.format(col_mean_name,self.targetName,
np.corrcoef(X[self.targetName].values,encoded_feature)[0][1]))
if self.discardOriginal_col:
X.drop(self.colnames, axis=1, inplace=True)
return X
class KFoldTargetEncoderTest(DummyTransformer):
def __init__(self,train,colNames,encodedName):
self.train = train
self.colNames = colNames
self.encodedName = encodedName
def fit(self, X, y=None):
return self
def transform(self,X):
mean = self.train[[self.colNames,self.encodedName]].groupby(self.colNames).mean().reset_index()
dd = {}
for index, row in mean.iterrows():
dd[row[self.colNames]] = row[self.encodedName]
X[self.encodedName] = X[self.colNames]
X.replace({self.encodedName: dd}, inplace=True)
return X
def target_encode_columns(
train,
test,
features,
target,
smoothing_threshold: int,
drop_features=False
):
df_train = train.copy()
df_test = test.copy()
n = len(df_train)
labels = sorted(list(df_train[target].unique()))
num_target_classes = len(labels)
label_avgs = []
for label in labels:
label_avgs.append(len(df_train.loc[df_train[target] == label])/n)
df_train1 = pd.get_dummies(df_train, columns=[target])
targets = list(df_train1.columns[-num_target_classes:])
for col in features:
print('Encoding', col, end='... ')
unique_c = list(df_train1.groupby(by=col).mean().index)
counts = list(df_train1.groupby(by=col).count()[targets[0]])
target_means = []
for t in targets:
target_means.append(df_train1.groupby(by=col).mean()[t])
for label in labels:
df_train[col+'_'+label] = 0
df_test[col+'_'+label] = 0
for i, c in enumerate(unique_c):
class_prob = []
if counts[i] <= smoothing_threshold:
for t_i, label in enumerate(labels):
class_prob.append((counts[i]-1)/10 * target_means[t_i][i] + (11-counts[i])/10 * label_avgs[t_i])
else:
for t_i, label in enumerate(labels):
class_prob.append(target_means[t_i][i])
for t_i, label in enumerate(labels):
df_train.loc[df_train[col]==c,col+'_'+label] = class_prob[t_i]
if c in df_test[col].unique():
for l_i, label in enumerate(labels):
df_test.loc[df_test[col]==c,col+'_'+label] = class_prob[l_i]
print('complete.')
if drop_features:
df_train.drop(columns=features, inplace=True)
df_test.drop(columns=features, inplace=True)
if drop_target:
df_train.drop(columns=target, inplace=True)
return df_train, df_test
def eval_log_coef(logit, x_train):
logit_coefs = pd.DataFrame(logit.coef_, columns=x_train.columns, index=['coef']).T
odds_change = []
prob_change = []
variance = []
var_importance = []
for k, v in logit_coefs.iterrows():
#changes in the odds ratio is the exp of the coefficient
odds_impact = np.exp(v['coef'])
probs_impact = odds_impact / (1 + odds_impact)
#variable importance multiples coef by standard deviation
sd = np.std(x_train[k])
var_imp = abs(np.exp(v['coef']*sd) - 1)
var_importance.append(var_imp)
variance.append(sd)
odds_change.append(odds_impact)
prob_change.append(probs_impact)
logit_coefs['SD'] = variance
logit_coefs['Change in Odds (%)'] = odds_change
logit_coefs['Change in Probability'] = prob_change
logit_coefs['Variable Importance'] = var_importance
logit_coefs = logit_coefs.reset_index().rename(columns = {'index': 'Variable'})
logit_coefs = logit_coefs.sort_values('Variable Importance', ascending=False)
# Plotting variable importance
logit_coefs = logit_coefs.sort_values('Variable Importance', ascending=False)
top_coefs = logit_coefs.head(10)
bottom_coefs = logit_coefs.tail(5)
fig, axes = plt.subplots(2, 1, figsize=(5, 10) ,sharex=False)
fig.suptitle('Variable Importance')
sns.barplot(ax = axes[0],
x = top_coefs['Variable Importance'],
y = top_coefs['Variable'])
# Plotting variable importance
logit_coefs = logit_coefs.sort_values('Change in Odds (%)', ascending=False)
top_coefs2 = logit_coefs.head(10)
#fig.suptitle('Change in Odds for Unit Increase in X')
sns.barplot(ax = axes[1],
x = top_coefs2['Change in Odds (%)'],
y = top_coefs2['Variable'])
plt.show();
return logit_coefs
def plot_heatmap(data):
sns.set(rc={'figure.figsize':(12,9)})
mask = np.triu(np.ones_like(data.corr(), dtype=bool))
sns.heatmap(data.corr(), mask=mask, annot=True)
plt.show()
def plot_value_counts(data):
plt.figure(figsize=(20,10))
data.Age.value_counts().plot(kind='bar')
plt.show()
def premodel(data):
X = data.iloc[:,:-1]
y = df.iloc[:,-1]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=29)
scaler = RobustScaler()
X_train = scaler.fit_transform(X_train)
X_train = pd.DataFrame(X_train)
scaler = RobustScaler()
X_test = scaler.fit_transform(X_test)
X_test = pd.DataFrame(X_test)
return (X_train, X_test, y_train, y_test)
def evalutate_reg(y_test, pred):
'''Evaluation results of Linear Regression model'''
print(f'R^2: {r2_score(y_test,pred)}')
print(f'Root Mean Squared Error: {np.sqrt((mean_squared_error(y_test, pred)))}')
print(f'Mean Squared Error: {mean_squared_error(y_test, pred)}')
print(f'Mean Absolute Error: {mean_absolute_error(y_test,pred)}')
def fillna_numeric(data, impute_method='mean'):
numeric_features = [column for column in data.columns if 'int' in str(data[column].dtype) or
'float' in str(data[column].dtype)]
print(f'Null values per numeric column before: {data[numeric_features].isna().sum()}')
for column in numeric_features:
if data[column].nunique()>15:
if impute_method=='mean':
data[column].fillna(data[column].mean(), inplace=True)
else:
data[column].fillna(data[column].median(), inplace=True)
else:
continue
print(f'Null values per numeric column now: {data[numeric_features].isna().sum()}')
def plot_countplot(data, column):
sns.set_theme(style="darkgrid")
plt.figure(figsize=(9,7))
ax = sns.countplot(data=data, x=target, palette="rocket", order=data[column].value_counts().index)
plt.setp(plot.get_xticklabels(), rotation=90);
for p in ax.patches:
total = len(data)
ax.annotate('{:.1f}%'.format(p.get_height()/total*100), (p.get_x()+0.25, p.get_height()+0.01))
plt.show();
def plot_pairplot(data, features: list):
sns.set_theme(style="darkgrid")
plt.figure(figsize=(20,15))
sns.pairplot(data[[features]], palette = 'rocket')
def difference_percentage(true_price, predicted_price):
diff = abs(true_price - predicted_price)
p = (diff * 100) / true_price
return p
def plot_predicted_actual(y_test, pred):
fig, ax = plt.subplots(figsize=(10,10))
ax.scatter(y_test,rf_y_pred, color='green')
ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=3)
ax.set_xlabel('Actual')
ax.set_ylabel('Predicted')
plt.show();
def fillna_via_cat(data, category, column, method='mean'):
print(f'Number of missing values before: {data[column].isna().sum()}')
data[column] = data[column].fillna(data.groupby(category)[column].transform(method))
print(f'Number of missing values now: {data[column].isna().sum()}')
def duplicate_check_remove(data):
num_duplicates = data.duplicated().sum()
if num_duplicates > 0:
print(f'Number of duplicate rows before: {num_duplicates}')
data.drop_duplicates(inplace = True)
print(f'Number of duplicate rows now: {data.duplicated().sum()}')
else:
print('There are no duplicate rows in the dataset.')
def target_encoder_smoother(data, smooth_val: int):
for col in data.columns:
t = TargetEncoder(smoothing=smooth_val)
data[col] = t.fit_transform(data[col], target)
def values_stripper(data):
for f in data.columns:
if data[f].dtype == 'O':
data[f] = data[f].str.strip()
print('Categorical features\' values are stripped')
def modify_colnames(data):
data.rename(columns=lambda x: x.lower().replace(' ', '_').replace('-', '_').replace(',', ''), inplace=True)
return print(f'Column names cleaned: {data.columns}')
def drop_null_cols(data, null_ratio_threshold):
total_dropped = 0
dropped_columns = []
for f in data.columns:
number_of_missing = data[f].isna().sum()
ratio = number_of_missing / data.shape[0]
if ratio>null_ratio_threshold/100:
total_dropped =+ 1
dropped_columns.append(f)
data.drop(f, axis=1, inplace=True)
else:
continue
if total_dropped!=0:
print(f'Dropped columns are: {dropped_columns}')
else:
print('No column is dropped')
def lowercase_categorical(data):
for column in data.columns:
if 'object' in str(data[column].dtype):
vals = []
for value in data[column].values:
if value=='nan':
vals.append(value)
else:
vals.append(value.lower())
data[column] = pd.Series(vals).to_frame()
print('Values lowercased: ')
return data.head()
def reduce_memory_usage(data, pct_threshold=0.4):
'''Can be reapplied after
outlier handling and scaling'''
start_mem = data.memory_usage().sum() / 1024**2
print('Memory usage before: {:.2f} MB'.format(start_mem))
for col in data.columns:
col_type = data[col].dtype
if col_type != 'object':
c_min = data[col].min()
c_max = data[col].max()
if 'int' in str(col_type):
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
data[col] = data[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
data[col] = data[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
data[col] = data[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
data[col] = data[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
data[col] = data[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
data[col] = data[col].astype(np.float32)
else:
data[col] = data[col].astype(np.float64)
elif col_type=='object':
if data[col].nunique() / data[col].shape[0] < pct_threshold:
data[col] = data[col].astype('category')
else:
continue
end_mem = data.memory_usage().sum() / 1024**2
print('Memory usage now : {:.2f} MB'.format(end_mem))
print('Memory usage decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
def num_one_boxplt(data):
num_cols = data.select_dtypes(include=['int','int64','float'])
melted_df = pd.melt(num_cols)
sns.set(rc={'figure.figsize':(6,4)})
sns.boxplot(x='value',y='variable', data=melted_df)
plt.show()
def num_multi_boxplt(data):
for i in data.columns:
sns.set(rc={'figure.figsize':(6,4)})
temp_df = data.loc[:,i]
sns.boxplot(temp_df)
plt.show()
def visualise_missing(data):
sns.set(rc={'figure.figsize':(10,7)})
sns.heatmap(data.isna(), yticklabels = False, cbar = False, cmap = plt.cm.magma)
plt.title(label = 'Heatmap for Missing Values', fontsize = 16, color='red')
plt.xlabel(xlabel = 'Features', fontsize = 16, color='red')
plt.show()
def get_model_metrics(x_train, y_train, x_test, y_test, preds, mdl):
train_acc = mdl.score(x_train,y_train)
test_acc = mdl.score(x_test, y_test)
rmse = (np.sqrt(mean_squared_error(y_test, preds)))
results = {'Train_acc': train_acc, 'Test_acc': test_acc, 'rmse': rmse}
model = 'Value'
model_metrics = pd.DataFrame(results.items(), columns = ['Metric', str(model)]).set_index('Metric')
return model_metrics
def classification_metrics(x_train, y_train, x_test, y_test, preds ,probs ,mdl): #Evaluation Metrics
Accuracy = accuracy_score(y_test, preds)
Precision = precision_score(y_test, preds)
Recall = recall_score(y_test, preds)
#Confusion Matrix
cm = pd.DataFrame(confusion_matrix(y_test, preds, labels=[0,1]))
TN = cm[0][0]
FN = cm[0][1]
FP = cm[1][0]
TP = cm[1][1]
TPR = TN/(FP+TN)
FPR = FP/(FP+TN)
# ROC Curve
fpr, tpr, thresholds = roc_curve(y_test, probs)
roc_auc = auc(fpr, tpr )
logit_summary = {'Accuracy': Accuracy,
'Precision': Precision,
'Recall': Recall,
'True Positive Rate': TPR,
'False Positive Rate': FPR,
}
model = 'Value'
class_metrics = pd.DataFrame(logit_summary.items(), columns = ['Metric', str(model)]).set_index('Metric')
plt.plot(fpr, tpr, label='ROC curve (area = %0.3f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate or (1 - Specifity)')
plt.ylabel('True Positive Rate or (Sensitivity)')
plt.title('Receiver Operating Characteristic (ROC)')
plt.legend(loc="lower right")
fig, ax = plt.subplots(figsize=(5, 5))
plot_confusion_matrix(mdl, x_test, y_test, cmap=plt.cm.Blues, ax=ax)
plt.tight_layout()
plt.title('Confusion Matrix', y = 1.1)
return class_metrics
class compute_outliers():
def __init__(self, data):
self.data = data
def coerce_outliers_iqr(self, value):
self.value = value
if self.value > upperlimit:
self.value = upperlimit
elif self.value < lowerlimit:
self.value = lowerlimit
return self.value
def apply_iqr(self, const=1.5):
for feature in self.data.columns[:-1]:
Q3 = self.data[feature].quantile(q = 0.75)
Q1 = self.data[feature].quantile(q = 0.25)
IQR = Q3 - Q1
outlier_range = IQR * const
upperlimit = Q3 + outlier_range
lowerlimit = Q1 - outlier_range
self.data[feature] = self.data[feature].apply(self.coerce_outliers_iqr)
outlier_vals = self.find_outliers_iqr()
return outlier_vals
def find_outlier_records_zscore(self, column_name: str, no_std=3):
no_outliers = 0
values = data[column_name].unique()
upper_border = np.asarray(self.data[column_name]).mean() + no_std*(np.asarray(self.data[column_name]).std())
lower_border = np.asarray(self.data[column_name]).mean() - no_std*(np.asarray(self.data[column_name]).std())
for value in values:
if value>upper_border:
no_outliers+=int(self.data[column_name].value_counts()[value])
elif value<lower_border:
no_outliers+=int(self.data[column_name].value_counts()[value])
else:
continue
print(f'''Number of outlier records in {column_name} column: {no_outliers}''')
def coerce_outliers_zscore(column_name: str, no_std=3):
values = data[column_name].unique()
upper_border = np.asarray(self.data[column_name]).mean() + no_std*(np.asarray(self.data[column_name]).std())
lower_border = np.asarray(self.data[column_name]).mean() - no_std*(np.asarray(self.data[column_name]).std())
no_of_outliers_before = 0
for value in values:
if value>upper_border:
no_of_outliers_before =+ 1
self.data[column_name].astype(float).replace({value: upper_border}, inplace=True)
elif value<lower_border:
no_of_outliers_before =+ 1
self.data[column_name].astype(float).replace({value: lower_border}, inplace=True)
def find_outliers_iqr(outlier_range_val=1.5):
for column in self.data.select_dtypes(include='number').columns:
Q3 = self.data[column].quantile(q = 0.75)
Q1 = self.data[column].quantile(q = 0.25)
IQR = Q3 - Q1
outlier_range = IQR * outlier_range_val
upperlimit = Q3 + outlier_range
lowerlimit = Q1 - outlier_range
no_outliers = self.data.loc[(self.data[column]>upperlimit) | (self.data[column]<lowerlimit)].shape[0]
print(f'''Number of outlier records in {column} column: {no_outliers}''')
def plot_rf_feat_importance(rf_model):
rf_features_importance=pd.DataFrame({
"Features":list(rf_model.feature_importances_)
},index=X.columns).sort_values(by="Features", axis=0, ascending=True)
fig = plt.figure(figsize=(12,6))
ax = fig.add_axes([0,0,1,1])
ax.barh(rf_features_importance.index[-10:],rf_features_importance.Features[-10:],color="purple")
for i, v in enumerate(rf_features_importance.Features[-10:]):
ax.text(v+0.001, i,('% 0.2f' % v)+"%")
plt.show();
#Feature importance
def plot_summary(model, X_train):
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_train)
shap.summary_plot(shap_values[1], X_train.astype("float"))