class check_pearson_corr():
def __init__(self, data, column1: str, column2: str):
self.data = data
self.column1 = column1
self.column2 = column2
self.data_exp = self.data.copy()
self.cols = [self.column1, self.column2]
assert(self.column1 in self.data.columns and self.column2 in self.data.columns)
assert(self.data[self.column1].dtype!='O' and self.data[self.column2].dtype!='O')
def normality_visual(self):
print('Checking for Gaussian distribution:\n')
for column in self.cols:
fig = qqplot(self.data[column], line = '45', fit=True)
ax = plt.gca()
fig.set_size_inches(15, 8)
ax.set_xlabel('Theoretical Quantiles', fontsize=13)
ax.set_ylabel(f'Sample Quantiles of the {column} column', fontsize=13)
plt.show()
def normality_test(self):
print('Shapiro-Wilk test for normality:\n')
for column in self.cols:
print(f'''P-value for {column} column: {shapiro(self.data[column])[1]}\n''')
def outlier_sensitivity(self):
print('Checking outlier sensitivity:\n')
for column in self.cols:
find_outlier_records(self.data, column_name=column)
def linearity_corr(self):
print('Checking for Linearity:\n')
sns.regplot(x = self.column1, y = self.column2, data=self.data, color='b')
plt.show()
def pearson_corr_coef(self):
print(f'Pearson correlation coefficient without outlier handling: {self.data[self.column1].corr(self.data[self.column2])}')
for column in self.cols:
coerce_outliers_zscore(self.data_exp, column_name=column)
print(f'Pearson correlation coefficient with outlier handling: {self.data_exp[self.column1].corr(self.data_exp[self.column2])}')
class check_anova_corr():
def __init__(self, data, cat_col: str, con_col: str):
self.data = data
self.cat_col = cat_col
self.con_col = con_col
assert(self.cat_col in self.data.columns and self.con_col in self.data.columns)
assert(self.data[self.cat_col].dtype=='O' and self.data[self.con_col].dtype!='O')
def check_mean_cat(self):
mean_cat = self.data.groupby(self.cat_col)[self.con_col].agg(['count', 'mean'])
return mean_cat
def compare_mean_visual(self):
plt.figure(figsize=(15,8))
sns.set_palette("Reds", 4)
sns.boxplot(x=self.cat_col, y=self.con_col, data=self.data)
sns.stripplot(x=self.cat_col, y=self.con_col, data=self.data, jitter=0.4, color="0.3")
plt.xlabel("")
plt.show()
def check_gaussian_visual(self):
for cat in self.data[self.cat_col].unique():
fig = qqplot(self.data[self.data[self.cat_col]==cat][self.con_col], line = '45', fit=True)
ax = plt.gca()
fig.set_size_inches(15, 8)
ax.set_xlabel('Theoretical Quantiles', fontsize=13)
ax.set_ylabel(f'Sample Quantiles of the {cat} category', fontsize=13)
ax.set_title("QQ Plot of Categories", fontsize=16)
plt.show()
def check_gaussian_stat(self):
for cat in self.data[self.cat_col].unique():
print(f'''P-value for {cat} category: {shapiro(self.data[self.data[self.cat_col]==cat][self.con_col])}''')
def check_residual_sum_normality(self):
st = ols("self.con_col ~ C(self.cat_col)", data = self.data).fit()
residuals = st.resid
fig = qqplot(residuals, line = '45', fit=True)
ax = plt.gca()
fig.set_size_inches(15, 8)
ax.set_xlabel("Theoretical Quantiles", fontsize=13)
ax.set_ylabel("Sample Quantiles", fontsize=13)
ax.set_title("QQPlot of the Residuals", fontsize=16)
plt.show()
def check_equal_variances(self):
self.data.groupby(self.cat_col)[self.cat_col].describe()['std'].to_frame()
self.homoscedasticity_test = levene(data[data[cat_col]=='Orthodox']['SLpM'], df[df['stance']=='Southpaw']['SLpM'],
df[df['stance']=='Switch']['SLpM'])
print(f'''Levene's test p-value: {self.homoscedasticity_test[1]}''')
def anova_test(self):
lm = ols('self.con_col ~ C(self.cat_col)', data=self.data).fit()
table = anova_lm(lm)
mc = pairwise_tukeyhsd(self.data[self.con_col], self.data[self.cat_col])
result = mc._results_table
return (table, result, mc.groupsunique)
def welch_test(self):
games_howell = pg.pairwise_gameshowell(dv=self.con_col, between=self.cat_col, data=self.data)
return (games_howell)
def conduct_anova_or_welch(self, p_value=0.05):
self.p_value = p_value
check_equal_variances()
if self.homoscedasticity_test[1] >= p_value:
anova_test()
else:
welch_test()
class handle_vif():
def __init__(self, data):
self.data = data
def compute_vif(self, considered_features: list):
self.considered_features = considered_features
X = self.data[self.considered_features]
X['intercept'] = 1
self.vif = pd.DataFrame()
self.vif["Variable"] = X.columns
self.vif["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
self.vif = self.vif[self.vif['Variable']!='intercept']
return self.vif
def drop_high_vif(self):
self.vif_table = self.vif.sort_values(by='VIF', ascending=False).reset_index(drop=True)
while self.vif_table['VIF'].iloc[0]>5:
self.data.drop(self.vif_table['Variable'][0], axis=1, inplace=True)
computed = compute_vif([col for col in self.data.columns])
self.vif_table = computed.sort_values(by='VIF', ascending=False).reset_index(drop=True)
return self.data.head()
def run_all_funcs(class_name):
attrs = (getattr(class_name, func) for func in dir(class_name) if callable(getattr(class_name, func)))
methods = filter(inspect.ismethod, attrs)
for method in methods:
try:
method()
except TypeError:
pass
class DummyTransformer(base.BaseEstimator, base.TransformerMixin):
def __init__(self):
return None
def fit(self, X=None, y=None):
return self
def transform(self, X=None):
return self
class KFoldTargetEncoderTrain(DummyTransformer):
def __init__(self,colnames: str, targetName: str,
n_fold=5, verbosity=False,
discardOriginal_col=False):
self.colnames = colnames
self.targetName = targetName
self.n_fold = n_fold
self.verbosity = verbosity
self.discardOriginal_col = discardOriginal_col
def fit(self, X, y=None):
return self
def transform(self,X):
assert(self.colnames in X.columns)
assert(self.targetName in X.columns)
mean_of_target = X[self.targetName].mean()
kf = KFold(n_splits = self.n_fold,
shuffle = True)
col_mean_name = self.colnames + '_' + 'Kfold_Target_Enc'
X[col_mean_name] = np.nan
for tr_ind, val_ind in kf.split(X):
X_tr, X_val = X.iloc[tr_ind], X.iloc[val_ind]
X.loc[X.index[val_ind], col_mean_name] = X_val[self.colnames].map(X_tr.groupby(self.colnames)[self.targetName].mean())
X[col_mean_name].fillna(mean_of_target, inplace = True) #Fill in the place that has become nan with the global mean
if self.verbosity:
encoded_feature = X[col_mean_name].values
print('Correlation between the new feature, {} and, {} is {}.'.format(col_mean_name,self.targetName,
np.corrcoef(X[self.targetName].values,encoded_feature)[0][1]))
if self.discardOriginal_col:
X.drop(self.colnames, axis=1, inplace=True)
return X
class KFoldTargetEncoderTest(DummyTransformer):
def __init__(self,train,colNames,encodedName):
self.train = train
self.colNames = colNames
self.encodedName = encodedName
def fit(self, X, y=None):
return self
def transform(self,X):
mean = self.train[[self.colNames,self.encodedName]].groupby(self.colNames).mean().reset_index()
dd = {}
for index, row in mean.iterrows():
dd[row[self.colNames]] = row[self.encodedName]
X[self.encodedName] = X[self.colNames]
X.replace({self.encodedName: dd}, inplace=True)
return X
def fillna_numeric(data, impute_method='mean'):
assert(impute_method=='mean' or impute_method=='median')
numeric_features = [column for column in data.columns if 'int' in str(data[column].dtype) or
'float' in str(data[column].dtype)]
print(f'Null values per numeric column before: {data[numeric_features].isna().sum()}')
for column in numeric_features:
if data[column].nunique()>15:
if impute_method=='mean':
data[column].fillna(data[column].mean(), inplace=True)
else:
data[column].fillna(data[column].median(), inplace=True)
else:
continue
print(f'Null values per numeric column now: {data[numeric_features].isna().sum()}')
def fillna_via_cat(data, category, column, method='mean'):
assert(category in data.columns and column in data.columns)
assert(data[category].dtype=='O')
assert(method=='mean' or method=='median')
assert('int' in str(data[column].dtype) or 'float' in str(data[column].dtype))
print(f'Number of missing values before: {data[column].isna().sum()}')
data[column] = data[column].fillna(data.groupby(category)[column].transform(method))
print(f'Number of missing values now: {data[column].isna().sum()}')
def duplicate_check_remove(data):
num_duplicates = data.duplicated().sum()
if num_duplicates > 0:
print(f'Number of duplicate rows before: {num_duplicates}')
data.drop_duplicates(inplace = True)
print(f'Number of duplicate rows now: {data.duplicated().sum()}')
else:
print('There are no duplicate rows in the dataset.')
def values_stripper(data):
for f in data.columns:
if data[f].dtype == 'O':
data[f] = data[f].str.strip()
print('Categorical features\' values are stripped')
def modify_colnames(data):
data.rename(columns=lambda x: x.lower().replace(' ', '_').replace('-', '_').replace(',', ''), inplace=True)
return print(f'Column names cleaned: {data.columns}')
def drop_null_cols(data, null_ratio_threshold):
total_dropped = 0
dropped_columns = []
for f in data.columns:
number_of_missing = data[f].isna().sum()
ratio = number_of_missing / data.shape[0]
if ratio>null_ratio_threshold/100:
total_dropped =+ 1
dropped_columns.append(f)
data.drop(f, axis=1, inplace=True)
else:
continue
if total_dropped!=0:
print(f'Dropped columns are: {dropped_columns}')
else:
print('No column is dropped')
def lowercase_categorical(data):
for column in data.columns:
if 'object' in str(data[column].dtype):
vals = []
for value in data[column].values:
if value=='nan':
vals.append(value)
else:
vals.append(value.lower())
data[column] = pd.Series(vals).to_frame()
print('Values lowercased: ')
return data.head()
def reduce_memory_usage(data, pct_threshold=0.4):
'''Can be reapplied after
outlier handling and scaling'''
start_mem = data.memory_usage().sum() / 1024**2
print('Memory usage before: {:.2f} MB'.format(start_mem))
for col in data.columns:
col_type = data[col].dtype
if col_type != 'object':
c_min = data[col].min()
c_max = data[col].max()
if 'int' in str(col_type):
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
data[col] = data[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
data[col] = data[col].astype(np.uint16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
data[col] = data[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
data[col] = data[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
data[col] = data[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
data[col] = data[col].astype(np.float32)
else:
data[col] = data[col].astype(np.float64)
elif col_type=='object':
if data[col].nunique() / data[col].shape[0] < pct_threshold:
data[col] = data[col].astype('category')
else:
continue
end_mem = data.memory_usage().sum() / 1024**2
print('Memory usage now : {:.2f} MB'.format(end_mem))
print('Memory usage decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
def coerce_outliers_zscore(data, column_name: str, no_std=3):
assert (column_name in data.columns)
values = data[column_name].unique()
upper_border = np.asarray(data[column_name]).mean() + no_std*(np.asarray(data[column_name]).std())
lower_border = np.asarray(data[column_name]).mean() - no_std*(np.asarray(data[column_name]).std())
no_of_outliers_before = 0
for value in values:
if value>upper_border:
no_of_outliers_before =+ 1
data[column_name].astype(float).replace({value: upper_border}, inplace=True)
elif value<lower_border:
no_of_outliers_before =+ 1
data[column_name].astype(float).replace({value: lower_border}, inplace=True)
def coerce_outliers_iqr(data, column_name: str, outlier_range_val=1.7):
assert (column_name in data.columns)
values = data[column_name].unique()
Q3 = data[column_name].quantile(q = 0.75)
Q1 = data[column_name].quantile(q = 0.25)
IQR = Q3 - Q1
outlier_range = IQR * outlier_range_val
upperlimit = Q3 + outlier_range
lowerlimit = Q1 - outlier_range
values = data[feature].unique()
for value in values:
if value > upperlimit:
value = upperlimit
elif value < lowerlimit:
value = lowerlimit
def find_outlier_records(data, column_name: str, no_std=3):
assert (column_name in data.columns)
no_outliers = 0
values = data[column_name].unique()
upper_border = np.asarray(data[column_name]).mean() + no_std*(np.asarray(data[column_name]).std())
lower_border = np.asarray(data[column_name]).mean() - no_std*(np.asarray(data[column_name]).std())
for value in values:
if value>upper_border:
no_outliers+=int(data[column_name].value_counts()[value])
elif value<lower_border:
no_outliers+=int(data[column_name].value_counts()[value])
else:
continue
print(f'''Number of outlier records in {column_name} column: {no_outliers}''')
def request_and_parse(url: str, var_name: str, search,
find_all=False, find=False):
request = requests.get(url)
soup = bs(request.text, 'lxml')
if find_all:
var_name = soup.find_all(search)
elif find:
var_name = soup.find_all(search)
return var_name
def visualise_missing(data):
sns.set(rc={'figure.figsize':(10,7)})
sns.heatmap(data = data.isna(), yticklabels = False, cbar = False, cmap = plt.cm.magma)
plt.title(label = 'Heatmap for Missing Values', fontsize = 16, color='red')
plt.xlabel(xlabel = 'Features', fontsize = 16, color='red')
plt.show()