<img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=2877026&amp;fmt=gif">

Machine Learning with Spark Streaming

By Aravind Boppana - September 14, 2019

Running Machine Learning algorithms using Spark Streaming

MLlib is Apache Spark’s scalable machine learning library consisting of common learning algorithms and utilities.

To demonstrate how we can run ML algorithms using Spark, I have taken a simple use case in which our Spark Streaming application reads data from Kafka and stores a copy as parquet file in HDFS. In this data ingestion pipeline, we run ML on the data that is coming in from Kafka.

I will be using the flower dataset in this example.

I tested four Classifiers:

 1. Decision Tree
 2. LogisticRegression
 3. NaiveBayes
 4. RandomForest


Assuming all your data has been loaded into HDFS, I trained the models on this ingested data. This has the benefit of allowing us to train our model on larger datasets. Once the training is done, I saved the models to HDFS so it can be imported whenever needed.

Create Dataframe by reading data from HDFS:

Casting input features to double datatypes and assembling them as Vectors.

Indexing Labels and Features

// Indesing Label
val labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
// Declaring ML Pipeline 
val pipeline = new Pipeline()

I trained the model with each classifier and saved them separately as we can load any model of our choice.

Declaring to use the Decision Tree Classifier

Declaring to use the Logistic Regression Classifier

Declaring to use the Naive Bayes Classifier

Declaring to use the Random Forest Classifier

Train and Save the Model

Running Predictions using Spark Streaming Application

Once the model is trained, I launched a Spark Streaming application that reads data from Kafka and runs the designated ML classifier on the incoming data.

Create a Spark DStream with Kafka as the source

 // Creating Spark Conf and Streaming context 
val sparkConf = new SparkConf()
val sparkStreamingContext = new StreamingContext(sparkConf,  
// Creating Spark Sessionval spark = SparkSession
import spark.implicits._
// Setting Kafka Params
val kafkaParams = Map[String, Object]("bootstrap.servers" 
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> KAFKA_GROUP_ID,
    "auto.offset.reset" -> KAFKA_OFFSET_RESET,
    "enable.auto.commit" -> (false: java.lang.Boolean))
val topicsSet = KAFKA_TOPICS.split(",").toSet
// Creating DStream 
val kafkaStream = KafkaUtils
       .createDirectStream[String, String](sparkStreamingContext, 
       PreferConsistent, ConsumerStrategies.Subscribe[String, 
       String](topicsSet, kafkaParams)) 

Load the Model from HDFS

Process the Kafka Stream and run the model on the Data. Like in Training here also we cast the Features to Double data types and assemble them as Vectors. We will run the evaluator as well to know the accuracy.

 kafkaStream.map(message => {
}).foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
// creating Dataframe
val data = spark.read.json(rdd.toDS())
// Casting Features to Double data types
val df = data.withColumn("PetalLength",               $"PetalLength"
                .withColumn("PetalWidth", $"PetalWidth".cast(sql.
                .withColumn("SepalLength", $"SepalLength".cast(sql
                .withColumn("SepalWidth", $"SepalWidth".cast(sql.
// Assembling Features as Vectors
val assembler = new VectorAssembler()
                .setInputCols(Array("PetalLength", "PetalWidth",
                "SepalLength", "SepalWidth"))
// Dropping Unwanted Columns
val transformed_data = assembler.transform(df).drop("PetalLength",
"PetalWidth", "SepalLength", "SepalWidth")
// Making predictions
val predictions = model.transform(transformed_data)
// Running the Evaluator
val evaluator = new MulticlassClassificationEvaluator()
// Calcuating Accuracy
val accuracy = evaluator.evaluate(predictions)
println("Accuracy: " + accuracy)
println("Test Error = " + (1.0 - accuracy))

At an average, there were ~700k records coming per 30 secs. I ran the Spark Streaming application with 4 executors and 2 Gigs memory per executor. Here are the running stats:


The above stats conclude that all 4 classifiers were close in their processing times. But accuracy wise, Logistic regression gave a lower accuracy. The other three were almost the same.

We can run ML models at different stages of a Big Data pipeline very successfully using Spark ML. As new versions of Spark release, support for custom pipeline components improve.

To get the best data engineering solutions for your business, reach out to us at Clairvoyant.


MLlib: Main Guide — Spark 2.4.3 Documentation https://spark.apache.org/docs/latest/ml-guide.html

Yurong Fan, Kushal Chandra, Nitya L, Aditya Aghi., (January 2017): Introduction of a big data machine learning tool -SparkML https://yurongfan.wordpress.com/2017/01/10/introduction-of-a-big-data-machine-learning-tool-sparkml/

Victoria McRitchie.,(September 2018): Project 1 — Classifying Iris Flowers.

http://rstudio-pubs static.s3.amazonaws.com/420656_c17c8444d32548eba6f894bcbdffcaab.html

Aravind Boppana

Tags: Data Engineering

Fill in your Details