import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
spark = sparknlp.start(gpu=True)
trainPath = 'nlp-getting-started/train.csv'
testPath = 'nlp-getting-started/test.csv'
trainData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(trainPath)
testData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(testPath)
print('Number of row in Training:', trainData.count())
print('Number of row in Test: ', testData.count())
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
sentenceDetector = SentenceDetector().setInputCols(["document"]).setOutputCol("sentence")
sentenceEmbeddings = BertSentenceEmbeddings.pretrained("sent_bert_base_cased", "en").setInputCols("sentence").setOutputCol("sentenceEmbeddings")
embeddingsFinisher = EmbeddingsFinisher().setInputCols(["sentenceEmbeddings"]).setOutputCols("finishedEmbeddings").setOutputAsVector(True)
pipeline = Pipeline().setStages([documentAssembler,sentenceDetector,sentenceEmbeddings,embeddingsFinisher])
model = pipeline.fit(trainData)
trainDataSnlp = model.transform(trainData)
testDataSnlp = model.transform(testData)
classifierDL = ClassifierDLApproach() \
.setInputCols(["sentenceEmbeddings"]) \
.setOutputCol("labelFull") \
.setLabelColumn("target") \
.setBatchSize(64) \
.setMaxEpochs(20) \
.setLr(5e-3) \
.setDropout(0.5)\
.setEnableOutputLogs(True)
classifierDLModel = classifierDL.fit(trainDataSnlp)
import os
with open("ClassifierDLApproach_0da7e830db4e.log", "r") as log_file :
print(log_file.read())
predictionTest = classifierDLModel.transform(testDataSnlp)
predictionTest = Finisher().setInputCols("labelFull").setOutputCols("labelSimple").transform(predictionTest)
predictionTest = predictionTest.withColumn("labelSimple", F.col("labelSimple").cast("array<integer>"))
predictionTest = predictionTest.selectExpr('id', "AGGREGATE(labelSimple, 0, (acc, x) -> acc + x) as label")
predictionTest = predictionTest.withColumn('labelFinal', F.when(F.col("label") >= 1, 1).otherwise(0))
predLabelTest = np.array(predictionTest.select('labelFinal').collect()).squeeze()