Running Machine Learning algorithms using Spark Streaming
MLlib is Apache Spark’s scalable machine learning library consisting of common learning algorithms and utilities.
To demonstrate how we can run ML algorithms using Spark, I have taken a simple use case in which our Spark Streaming application reads data from Kafka and stores a copy as parquet file in HDFS. In this data ingestion pipeline, we run ML on the data that is coming in from Kafka.
I will be using the flower dataset in this example.
I tested four Classifiers:
1. Decision Tree 2. LogisticRegression 3. NaiveBayes 4. RandomForest
Training
Assuming all your data has been loaded into HDFS, I trained the models on this ingested data. This has the benefit of allowing us to train our model on larger datasets. Once the training is done, I saved the models to HDFS so it can be imported whenever needed.
Create Dataframe by reading data from HDFS:
Casting input features to double datatypes and assembling them as Vectors.
Indexing Labels and Features
// Indesing Label val labelIndexer = new StringIndexer() .setInputCol("flower") .setOutputCol("indexedFlower") .fit(transformed_data) // Automatically identify categorical features, and index them. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(transformed_data) // Convert indexed labels back to original labels. val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) // Declaring ML Pipeline val pipeline = new Pipeline()
I trained the model with each classifier and saved them separately as we can load any model of our choice.
Declaring to use the Decision Tree Classifier
Declaring to use the Logistic Regression Classifier
Declaring to use the Naive Bayes Classifier
Declaring to use the Random Forest Classifier
Train and Save the Model
Running Predictions using Spark Streaming Application
Once the model is trained, I launched a Spark Streaming application that reads data from Kafka and runs the designated ML classifier on the incoming data.
Create a Spark DStream with Kafka as the source
// Creating Spark Conf and Streaming context val sparkConf = new SparkConf() .setAppName(SPARK_APP_NAME) .setMaster(SPARK_MASTER) val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(SPARK_BATCH_DURATION)) // Creating Spark Sessionval spark = SparkSession .builder .config(sparkConf) .getOrCreate() import spark.implicits._ // Setting Kafka Params val kafkaParams = Map[String, Object]("bootstrap.servers" -> KAFKA_BROKERS, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> KAFKA_GROUP_ID, "auto.offset.reset" -> KAFKA_OFFSET_RESET, "enable.auto.commit" -> (false: java.lang.Boolean)) val topicsSet = KAFKA_TOPICS.split(",").toSet // Creating DStream val kafkaStream = KafkaUtils .createDirectStream[String, String](sparkStreamingContext, PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
Load the Model from HDFS
Process the Kafka Stream and run the model on the Data. Like in Training here also we cast the Features to Double data types and assemble them as Vectors. We will run the evaluator as well to know the accuracy.
kafkaStream.map(message => { message.value().toString }).foreachRDD(rdd => { if (!rdd.isEmpty()) { // creating Dataframe val data = spark.read.json(rdd.toDS()) // Casting Features to Double data types val df = data.withColumn("PetalLength", $"PetalLength" .cast(sql.types.DoubleType)) .withColumn("PetalWidth", $"PetalWidth".cast(sql. types.DoubleType)) .withColumn("SepalLength", $"SepalLength".cast(sql .types.DoubleType)) .withColumn("SepalWidth", $"SepalWidth".cast(sql. types.DoubleType)) // Assembling Features as Vectors val assembler = new VectorAssembler() .setInputCols(Array("PetalLength", "PetalWidth", "SepalLength", "SepalWidth")) .setOutputCol("features") // Dropping Unwanted Columns val transformed_data = assembler.transform(df).drop("PetalLength", "PetalWidth", "SepalLength", "SepalWidth") // Making predictions val predictions = model.transform(transformed_data) // Running the Evaluator val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedFlower") .setPredictionCol("prediction") .setMetricName("accuracy") // Calcuating Accuracy val accuracy = evaluator.evaluate(predictions) println("Accuracy: " + accuracy) println("Test Error = " + (1.0 - accuracy)) } }) sparkStreamingContext.start() sparkStreamingContext.awaitTermination()
At an average, there were ~700k records coming per 30 secs. I ran the Spark Streaming application with 4 executors and 2 Gigs memory per executor. Here are the running stats:
Conclusion
The above stats conclude that all 4 classifiers were close in their processing times. But accuracy wise, Logistic regression gave a lower accuracy. The other three were almost the same.
We can run ML models at different stages of a Big Data pipeline very successfully using Spark ML. As new versions of Spark release, support for custom pipeline components improve.
To get the best data engineering solutions for your business, reach out to us at Clairvoyant.
References
MLlib: Main Guide — Spark 2.4.3 Documentation https://spark.apache.org/docs/latest/ml-guide.html
Yurong Fan, Kushal Chandra, Nitya L, Aditya Aghi., (January 2017): Introduction of a big data machine learning tool -SparkML https://yurongfan.wordpress.com/2017/01/10/introduction-of-a-big-data-machine-learning-tool-sparkml/
Victoria McRitchie.,(September 2018): Project 1 — Classifying Iris Flowers.
http://rstudio-pubs static.s3.amazonaws.com/420656_c17c8444d32548eba6f894bcbdffcaab.html