import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
spark = sparknlp.start(gpu=True)
trainPath = 'nlp-getting-started/train.csv'
testPath = 'nlp-getting-started/test.csv'
trainData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(trainPath)
testData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(testPath)
print('Number of row in Training:', trainData.count())
print('Number of row in Test: ', testData.count())
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
sentenceDetector = SentenceDetector().setInputCols(["document"]).setOutputCol("sentence")
sentenceEmbeddings = BertSentenceEmbeddings.pretrained("sent_bert_base_cased", "en").setInputCols("sentence").setOutputCol("sentenceEmbeddings")
embeddingsFinisher = EmbeddingsFinisher().setInputCols(["sentenceEmbeddings"]).setOutputCols("finishedEmbeddings").setOutputAsVector(True)
pipeline = Pipeline().setStages([documentAssembler,sentenceDetector,sentenceEmbeddings,embeddingsFinisher])
model = pipeline.fit(trainData)
trainDataSnlp = model.transform(trainData)
testDataSnlp = model.transform(testData)
classifierDL = ClassifierDLApproach() \
.setInputCols(["sentenceEmbeddings"]) \
.setOutputCol("labelFull") \
.setLabelColumn("target") \
.setBatchSize(64) \
.setMaxEpochs(20) \
.setLr(5e-3) \
.setDropout(0.5)\
.setEnableOutputLogs(True)
classifierDLModel = classifierDL.fit(trainDataSnlp)
import os
with open("ClassifierDLApproach_0da7e830db4e.log", "r") as log_file :
print(log_file.read())
Training started - epochs: 20 - learning_rate: 0.005 - batch_size: 64 - training_examples: 13341 - classes: 2
Epoch 0/20 - 0.49s - loss: 125.230545 - acc: 0.7007471 - batches: 209
Epoch 1/20 - 0.34s - loss: 115.544014 - acc: 0.7358437 - batches: 209
Epoch 2/20 - 0.34s - loss: 114.46812 - acc: 0.7435811 - batches: 209
Epoch 3/20 - 0.33s - loss: 113.75002 - acc: 0.74919957 - batches: 209
Epoch 4/20 - 0.34s - loss: 113.10281 - acc: 0.7550745 - batches: 209
Epoch 5/20 - 0.35s - loss: 112.81218 - acc: 0.760048 - batches: 209
Epoch 6/20 - 0.33s - loss: 112.79334 - acc: 0.7641045 - batches: 209
Epoch 7/20 - 0.37s - loss: 112.8797 - acc: 0.767244 - batches: 209
Epoch 8/20 - 0.34s - loss: 112.86142 - acc: 0.76996386 - batches: 209
Epoch 9/20 - 0.34s - loss: 112.65094 - acc: 0.77289355 - batches: 209
Epoch 10/20 - 0.34s - loss: 112.2948 - acc: 0.77552277 - batches: 209
Epoch 11/20 - 0.34s - loss: 112.11395 - acc: 0.7776261 - batches: 209
Epoch 12/20 - 0.36s - loss: 111.9319 - acc: 0.7798797 - batches: 209
Epoch 13/20 - 0.34s - loss: 111.74605 - acc: 0.7816075 - batches: 209
Epoch 14/20 - 0.35s - loss: 111.56647 - acc: 0.783185 - batches: 209
Epoch 15/20 - 0.34s - loss: 111.35628 - acc: 0.7839362 - batches: 209
Epoch 16/20 - 0.34s - loss: 111.02795 - acc: 0.78566396 - batches: 209
Epoch 17/20 - 0.33s - loss: 110.67951 - acc: 0.7874824 - batches: 209
Epoch 18/20 - 0.33s - loss: 110.32417 - acc: 0.78898484 - batches: 209
Epoch 19/20 - 0.39s - loss: 109.93248 - acc: 0.7905028 - batches: 209
predictionTest = classifierDLModel.transform(testDataSnlp)
predictionTest = Finisher().setInputCols("labelFull").setOutputCols("labelSimple").transform(predictionTest)
predictionTest = predictionTest.withColumn("labelSimple", F.col("labelSimple").cast("array<integer>"))
predictionTest = predictionTest.selectExpr('id', "AGGREGATE(labelSimple, 0, (acc, x) -> acc + x) as label")
predictionTest = predictionTest.withColumn('labelFinal', F.when(F.col("label") >= 1, 1).otherwise(0))
predLabelTest = np.array(predictionTest.select('labelFinal').collect()).squeeze()