<img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=2877026&amp;fmt=gif">

Improving your Apache Spark Application Performance

By Robert Sanders - August 27, 2021

Simple Tips and Tricks to Improve the Performance of your Spark Applications

Apache Spark has quickly become one of the most heavily used processing engines in the Big Data space since it became a Top-Level Apache Project in February of 2014. Not only can it run in a variety of environments (locally, Standalone Spark Cluster, Apache Mesos, YARN, etc) but it can also provide a number of libraries that can help you solve just about any problem on Hadoop. This includes running SQL Queries, Streaming, and Machine Learning to name a few. All running on an optimized execution engine.

We at Clairvoyant have built many Data Pipelines with Apache Spark, including Batch and Streaming over the years. You can find out more information here. After having built so many pipelines we’ve found some simple ways to improve the performance of Spark Applications. Here are a few tips and tricks that we’ve found:

Use DataFrame’s instead of RDDs

Instead of using the RDD API

val rdd = sc.textFile("/path/to/file.txt")

Use the DataFrames API

val df = park.read.textFile("/path/to/file.txt")

By using the DataFrame API and not reverting to using RDDs you enable Spark to use the Catalyst Optimizer to improve the execution plan of your Spark Job.

Avoid using Regex’s

Java Regex is a great process for parsing data in an expected structure. Unfortunately, the Regex process is generally a slow process and when you have to process millions of rows, a little bit of increase in parsing a single row can cause the entire job to increase in processing time. If at all possible, avoid using Regex’s and try to ensure your data is loaded in a more structured format.

Optimize Joins

Put the Largest Dataset on the Left

When you’re joining together two datasets where one is smaller than the other, put the larger dataset on the “Left”:

val joinedDF = largerDF.leftJoin(smallerDF, largerDF("id") ===
smallerDF("some_id"))

When Spark shuffles data for the join, it keeps the data you specified on the left static on the executors and transfers the data you designed on the right between the executors. If the data that’s on the right, that’s being transferred, is larger, then the serialization and transfer of the data will take longer.

Utilize Broadcast Joining for joining Smaller Datasets to Larger Ones

In many cases, we will be joining smaller data sets (a couple dozen or so rows, maybe a bit more) with larger data sets. In this case, it’s more performant to use a Broadcast Join:

import org.apache.spark.sql.functions._

val joinedDF = largeDF.join(broadcast(smallDF), largeDF("id") ===
smallDF("some_id"))

Use Caching

If you find you are constantly using the same DataFrame on multiple queries, it’s recommended to implement Caching or Persistence:

valdf =spark.read.textFile("/path/to/file.txt").cache()

Note: Avoid overusing this. Due to Spark’s caching strategy (in-memory then swap to disk) the cache can end up in slightly slower storage. Also, using that storage space for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the DataFrame.¹

Compute Statistics of Tables Before Processing

Before querying a series of tables, it can be helpful to tell spark to Compute the Statistics of those tables so that the Catalyst Optimizer can come up with a better plan on how to process the tables.

spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS")

In some cases, Spark doesn’t get everything it needs from just the above broad COMPUTE STATISTICS call. It also helps to tell Spark to check specific columns so the Catalyst Optimizer can better check those columns. It’s recommended to COMPUTE STATISTICS for any columns that are involved in filtering and joining.

spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS FOR 
COLUMNS joinColumn, filterColumn")

Set the spark.sql.shuffle.partitions Configuration Parameter

The default value for this is 200 which can be too high for some jobs. Set this configuration to the number of cores you have available across all your executors.

spark.conf.set("spark.sql.shuffle.partitions", 10)

Reference

https://medium.com/teads-engineering/spark-performance-tuning-from-the-trenches-7cbde521cf60

Author
Robert Sanders

Director of Big Data and Cloud Engineering for Clairvoyant LLC | Marathon Runner | Triathlete | Endurance Athlete

Tags: Analytics Spark Technology Data Engineering Software Engineering