!pip install pyspark
from google.colab import drive
drive.mount('/content/datasets')
# This code creates a filterable and interactive data table like excel for pandas dataframes
from google.colab import data_table
data_table.enable_dataframe_formatter()
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('OUTLIER REMOVAL').getOrCreate()
spark
df = spark.read.csv('/content/datasets/MyDrive/Datasets/UCLA_Module2_FinalProject/customer_data.csv',header = True)
df.show()
df.count()
len(df.columns)
df.printSchema()
numeric_cols = df.columns[2:]
numeric_cols
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType
for column in numeric_cols:
df = df.withColumn(column,f.col(column).cast(IntegerType()))
df.printSchema()
numeric_columns = [column[0] for column in df.dtypes if column[1]=='int']
numeric_columns
def find_outliers(df):
# Identifying the numerical columns in a spark dataframe
numeric_columns = [column[0] for column in df.dtypes if column[1]=='int']
# Using the `for` loop to create new columns by identifying the outliers for each feature
for column in numeric_columns:
less_Q1 = 'less_Q1_{}'.format(column)
more_Q3 = 'more_Q3_{}'.format(column)
Q1 = 'Q1_{}'.format(column)
Q3 = 'Q3_{}'.format(column)
# Q1 : First Quartile ., Q3 : Third Quartile
Q1 = df.approxQuantile(column,[0.25],relativeError=0)
Q3 = df.approxQuantile(column,[0.75],relativeError=0)
# IQR : Inter Quantile Range
# We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
# Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
IQR = Q3[0] - Q1[0]
#selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
less_Q1 = Q1[0] - 1.5*IQR
more_Q3 = Q3[0] + 1.5*IQR
isOutlierCol = 'is_outlier_{}'.format(column)
df = df.withColumn(isOutlierCol,f.when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
# Selecting the specific columns which we have added above, to check if there are any outliers
selected_columns = [column for column in df.columns if column.startswith("is_outlier")]
# Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
df = df.withColumn('total_outliers',sum(df[column] for column in selected_columns))
# Dropping the extra columns created above, just to create nice dataframe., without extra columns
df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])
return df
new_df = find_outliers(df)
new_df.show()
new_df_with_no_outliers = new_df.filter(new_df['total_Outliers']<=1)
new_df_with_no_outliers = new_df_with_no_outliers.select(*df.columns)
new_df_with_no_outliers.show()
new_df_with_no_outliers.count()
data_with_outliers = new_df.filter(new_df['total_Outliers']>=2)
data_with_outliers.show()
# Selecting the numerical columns from the original dataframe and converting into pandas
numeric_columns
original_numerical_df = df.select(*numeric_columns).toPandas()
original_numerical_df.head(10)
# Plotting the box for the dataset after removing the outliers
dataset_after_removing_outliers = new_df_with_no_outliers.toPandas()
dataset_after_removing_outliers.head(10)
numeric_columns
fig,ax = plt.subplots(2,6,figsize=(15,8))
for i,df in enumerate([original_numerical_df,dataset_after_removing_outliers]):
for j, col in enumerate(numeric_columns):
sns.boxplot(data = df, y=col,ax=ax[i][j])