import time
import numpy as np
# The config below is for https://microsoft.github.io/SynapseML/
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "com.microsoft.azure:synapseml:0.9.1") \
.config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
.getOrCreate()
print(spark)
trainPath = 'nlp-getting-started/train.csv'
testPath = 'nlp-getting-started/test.csv'
trainData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(trainPath)
testData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(testPath)
print('Number of row in Training:', trainData.count())
print('Number of row in Test: ', testData.count())
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasInputCols, HasOutputCols, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec, StringIndexer,OneHotEncoder, VectorAssembler, RobustScaler
class FillNanTransformer(Transformer, HasInputCols, DefaultParamsReadable, DefaultParamsWritable):
nanReplacement = Param(Params._dummy(), "nanReplacement", "nanReplacement", typeConverter=TypeConverters.toString)
@keyword_only
def __init__(self, inputCols=None, nanReplacement=None):
super(FillNanTransformer, self).__init__()
self._setDefault(nanReplacement="")
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCols=None, nanReplacement=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def getNanReplacement(self):
return self.getOrDefault(self.nanReplacement)
def _transform(self, dataset):
nanReplacement = self.getNanReplacement()
dataset = dataset.na.fill(value=nanReplacement,subset=self.getInputCols())
return dataset
fillNanTransformer = FillNanTransformer(inputCols=["keyword", "location"], nanReplacement="$")
textFillNanTransformer = FillNanTransformer(inputCols=["text"], nanReplacement="")
class RemovePatternTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
pattern = Param(Params._dummy(), "pattern", "pattern", typeConverter=TypeConverters.toString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None, pattern=None):
super(RemovePatternTransformer, self).__init__()
self._setDefault(pattern="")
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, pattern=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def getPattern(self):
return self.getOrDefault(self.pattern)
def _transform(self, dataset):
pattern = self.getPattern()
dataset = dataset.withColumn(self.getOutputCol(), F.regexp_replace(F.col(self.getInputCol()), pattern, ""))
return dataset
class CheckPatternTransformer(Transformer, HasInputCol, HasOutputCol):
pattern = Param(Params._dummy(), "pattern", "pattern", typeConverter=TypeConverters.toString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None, pattern=None):
super(CheckPatternTransformer, self).__init__()
self._setDefault(pattern="")
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, pattern=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def getPattern(self):
return self.getOrDefault(self.pattern)
def _transform(self, dataset):
pattern = self.getPattern()
dataset = dataset.withColumn(self.getOutputCol(), F.when(F.col(self.getInputCol()).rlike(pattern),1.).otherwise(0.))
return dataset
removeUrlTransformer = RemovePatternTransformer(inputCol="text", outputCol="textNoUrl", pattern="(https?://\S+)")
checkUrlTransformer = CheckPatternTransformer(inputCol="text", outputCol="textIsContainedUrl", pattern="(https?://\S+)")
class GetLengthTransformer(Transformer, HasInputCols, HasOutputCols):
@keyword_only
def __init__(self, inputCols=None, outputCols=None):
super(GetLengthTransformer, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCols=None, outputCols=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
for inputCol, outputCol in zip(self.getInputCols(), self.getOutputCols()):
dataset = dataset.withColumn(outputCol, F.length(inputCol))
return dataset
getLengthTransformer = GetLengthTransformer(inputCols=["keyword","textNoUrl"], outputCols=["keywordLen", "textNoUrlLen"])
keywordIndexer = StringIndexer(inputCol="keyword", outputCol="keywordIndex", handleInvalid="keep")
locationIndexer = StringIndexer(inputCol="location", outputCol="locationIndex", handleInvalid="keep")
regexTokenizer = RegexTokenizer(inputCol="textNoUrl", outputCol="textArrayWord", pattern="\\W")
stopWordsRemover = StopWordsRemover(inputCol="textArrayWord", outputCol="textNoSW")
word2Vec = Word2Vec(vectorSize=50, windowSize=10, minCount=0, inputCol="textNoSW", outputCol="textVec")
class ConcatenateTransformer(Transformer, HasInputCols, HasOutputCol):
@keyword_only
def __init__(self, inputCols=None, outputCol=None):
super(ConcatenateTransformer, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCols=None, outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
dataset = dataset.withColumn(self.getOutputCol(), F.col(self.getInputCols()[0]))
for colName in self.getInputCols()[1:]:
dataset = dataset.withColumn(self.getOutputCol(),
F.concat_ws('@', F.col(self.getOutputCol()), F.col(colName)))
return dataset
concatStringTransformer = ConcatenateTransformer(inputCols=["keyword", "location", "textNoUrl"], outputCol="concatString")
concatStringRegexTokenizer = RegexTokenizer(inputCol="concatString", outputCol="concatStringArrayWord", pattern="\\W")
concatStringStopWordsRemover = StopWordsRemover(inputCol="concatStringArrayWord", outputCol="concatStringArrayWordNoSW")
concatStringWord2Vec = Word2Vec(vectorSize=50, windowSize=10, minCount=0, inputCol="concatStringArrayWord", outputCol="concatStringVec")
concatStringNoSWWord2Vec = Word2Vec(vectorSize=50, windowSize=10, minCount=0, inputCol="concatStringArrayWordNoSW", outputCol="concatStringNoSWVec")
discreteFeaturesAssembler = VectorAssembler(inputCols=["keywordIndex", "locationIndex", "textIsContainedUrl",
"keywordLen", "textNoUrlLen"],
outputCol="discreteFeatures")
discreteFeaturesRobustScaler = RobustScaler(inputCol="discreteFeatures", outputCol="discreteFeaturesScale",
withScaling=True, withCentering=True, lower=0.25, upper=0.75)
oneHotEncoder = OneHotEncoder(inputCols=["keywordIndex", "locationIndex", "textIsContainedUrl"],
outputCols=["keywordVec", "locationVec", "textIsContainedUrlVec"],
handleInvalid="keep")
discreteOneHotEncoderFeaturesAssembler = VectorAssembler(inputCols=["keywordVec", "locationVec", "textIsContainedUrlVec",
"keywordLen", "textNoUrlLen"],
outputCol="discreteOneHotEncoderFeatures")
discreteOneHotEncoderFeaturesRobustScaler = RobustScaler(inputCol="discreteOneHotEncoderFeatures", outputCol="discreteOneHotEncoderFeaturesScale",
withScaling=True, withCentering=True, lower=0.25, upper=0.75)
discreteAndTextFeaturesAssembler = VectorAssembler(inputCols=["discreteFeatures", "textVec"],
outputCol="discreteAndTextFeatures")
discreteAndTextFeaturesRobustScaler = RobustScaler(inputCol="discreteAndTextFeatures", outputCol="discreteAndTextFeaturesScale",
withScaling=True, withCentering=True, lower=0.25, upper=0.75)
discreteOneHotEncoderAndTextFeaturesAssembler = VectorAssembler(inputCols=["discreteOneHotEncoderFeatures", "textVec"],
outputCol="discreteOneHotEncoderAndTextFeatures")
discreteOneHotEncoderAndTextFeaturesRobustScaler = RobustScaler(inputCol="discreteOneHotEncoderAndTextFeatures", outputCol="discreteOneHotEncoderAndTextFeaturesScale",
withScaling=True, withCentering=True, lower=0.25, upper=0.75)
preprocessingPipeline = Pipeline(stages=[fillNanTransformer, textFillNanTransformer,
removeUrlTransformer, regexTokenizer, stopWordsRemover, word2Vec,
keywordIndexer, locationIndexer, checkUrlTransformer, getLengthTransformer,
oneHotEncoder,
concatStringTransformer, concatStringRegexTokenizer, concatStringStopWordsRemover,
concatStringWord2Vec, concatStringNoSWWord2Vec,
discreteFeaturesAssembler,
discreteOneHotEncoderFeaturesAssembler,
discreteAndTextFeaturesAssembler,
discreteOneHotEncoderAndTextFeaturesAssembler,
discreteFeaturesRobustScaler,
discreteOneHotEncoderFeaturesRobustScaler,
discreteAndTextFeaturesRobustScaler,
discreteOneHotEncoderAndTextFeaturesRobustScaler
])
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC
from synapse.ml.lightgbm import LightGBMClassifier
labelCol = "target"
evaluator = BinaryClassificationEvaluator(labelCol=labelCol, rawPredictionCol="prediction", metricName="areaUnderROC")
preprocessingModel = preprocessingPipeline.fit(trainData)
trainDataPreprocessed = preprocessingModel.transform(trainData)
testDataPreprocessed = preprocessingModel.transform(testData)
featuresCol = "discreteFeaturesScale"
featuresImportanceModel = RandomForestClassifier(featuresCol=featuresCol, labelCol=labelCol).fit(trainDataPreprocessed)
for column in zip(discreteFeaturesAssembler.getInputCols(), list(featuresImportanceModel.featureImportances)):
print(f"{column[0]:20}: {column[1]}")
featuresCols = ["discreteFeaturesScale", "discreteOneHotEncoderFeaturesScale", "textVec", "discreteAndTextFeaturesScale",
"discreteOneHotEncoderAndTextFeaturesScale", "concatStringVec", "concatStringNoSWVec"]
algorithmList = {"LR": LogisticRegression(labelCol=labelCol),
"DTC": DecisionTreeClassifier(labelCol=labelCol),
"RFC": RandomForestClassifier(labelCol=labelCol),
"GBTC": GBTClassifier(labelCol=labelCol),
"LSVC": LinearSVC(labelCol=labelCol),
"LGBMC":LightGBMClassifier(labelCol=labelCol)
}
from random import randint
for param in featuresCols:
scores = []
for name, algorithm in zip(algorithmList.keys(), algorithmList.values()):
trainSet, validSet = trainDataPreprocessed.randomSplit([0.9, 0.1], seed=randint(11, 2021))
algorithm.setFeaturesCol(param)
prediction = algorithm.fit(trainSet).transform(validSet)
scores.append(evaluator.evaluate(prediction))
print(f'Param {param:50}: {np.round(np.mean(scores), 5)}')
trainSet, validSet = trainDataPreprocessed.randomSplit([0.9, 0.1], seed=2021)
featuresCol = "discreteAndTextFeaturesScale"
algorithmList = {"LR": LogisticRegression(featuresCol=featuresCol, labelCol=labelCol),
"DTC": DecisionTreeClassifier(featuresCol=featuresCol, labelCol=labelCol),
"RFC": RandomForestClassifier(featuresCol=featuresCol, labelCol=labelCol),
"GBTC": GBTClassifier(featuresCol=featuresCol, labelCol=labelCol),
"MPC": MultilayerPerceptronClassifier(featuresCol=featuresCol, labelCol=labelCol, layers=[55,2]),
"LSVC": LinearSVC(featuresCol=featuresCol, labelCol=labelCol,),
"LGBMC":LightGBMClassifier(featuresCol=featuresCol, labelCol=labelCol)
}
for name, algorithm in zip(algorithmList.keys(), algorithmList.values()):
startTime = time.time()
model = algorithm.fit(trainSet)
prediction = model.transform(validSet)
score = evaluator.evaluate(prediction)
print(f'{name:4}: {np.round(score,5)} in {np.round(time.time() - startTime, 3)}s')
from synapse.ml.automl import *
from synapse.ml.train import *
from pyspark.ml.functions import vector_to_array
import pyspark.sql.functions as F
import re
featuresCol = "discreteAndTextFeaturesScale"
trainSetAllHP = (trainDataPreprocessed.withColumn("feature", vector_to_array(featuresCol)))\
.select([labelCol]+ [F.col("feature")[i] for i in range(55)])
trainSetHP = (trainSet.withColumn("feature", vector_to_array(featuresCol)))\
.select([labelCol]+ [F.col("feature")[i] for i in range(55)])
validSetHP = (validSet.withColumn("feature", vector_to_array(featuresCol)))\
.select([labelCol]+ [F.col("feature")[i] for i in range(55)])
testSetHP = (testDataPreprocessed.withColumn("feature", vector_to_array(featuresCol)))\
.select([F.col("feature")[i] for i in range(55)])
# We remove "[]" in the column names.
trainSetAllHP = trainSetAllHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in trainSetAllHP.columns])
trainSetHP = trainSetHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in trainSetHP.columns])
validSetHP = validSetHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in validSetHP.columns])
testSetHP = testSetHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in testSetHP.columns])
trainSetHP.select("target", "feature0", "feature1").printSchema()
from synapse.ml.automl import *
from synapse.ml.train import *
import sklearn.metrics as metrics
lgbmc = LightGBMClassifier(boostingType='dart',
objective= 'binary',
metric= 'auc',
isUnbalance= True,
numIterations= 300)
smlmodels = [lgbmc]
mmlmodels = [TrainClassifier(model=model, labelCol=labelCol) for model in smlmodels]
paramBuilder = (HyperparamBuilder()
.addHyperparam(lgbmc, lgbmc.learningRate, RangeHyperParam(0.01, 0.5))
.addHyperparam(lgbmc, lgbmc.maxDepth, DiscreteHyperParam([1,30]))
.addHyperparam(lgbmc, lgbmc.numLeaves, DiscreteHyperParam([10,200]))
.addHyperparam(lgbmc, lgbmc.featureFraction, RangeHyperParam(0.1, 1.0))
.addHyperparam(lgbmc, lgbmc.baggingFraction, RangeHyperParam(0.1, 1.0))
.addHyperparam(lgbmc, lgbmc.baggingFreq, RangeHyperParam(0, 3))
)
searchSpace = paramBuilder.build()
randomSpace = RandomSpace(searchSpace)
bestModel = TuneHyperparameters(evaluationMetric="AUC", models=mmlmodels, numFolds=2,
numRuns=len(mmlmodels) * 2, parallelism=1,
paramSpace=randomSpace.space(), seed=0).fit(trainSetHP)
prediction = bestModel.transform(validSetHP)
predLabel = np.array(prediction.select('scored_labels').collect()).squeeze()
trueLabel = np.array(prediction.select('target').collect()).squeeze()
print(metrics.roc_auc_score(trueLabel, predLabel))