<img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=2877026&amp;fmt=gif">

Custom Partitioning an Apache Spark DataSet

By Naveen Chitti - August 27, 2021

Custom partitioning for better distribution of your data while creating partitions in spark jobs

Improving Performance using Custom Partitions

Optimizing the performance of your Spark Application and the way it retrieves data from the DataSets call for the following:

  1. improving data storage mechanism, and
  2. the method by which execution engines read or search data in DataSets.

Clairvoyant has managed the data of several enterprises. Over the years, we have developed a methodology to implement custom partitions within DataSets. This has enabled several of our clients to optimize the speed of their search function execution and reduce the cost associated with managing multiple partitions.

Apache Spark, as we all know, is generally used to process large data sets in a distributed manner. However, the performance of Spark jobs really depends on whether the data that's being processed is properly (evenly) distributed across its executors or not.

The problem with uneven data distribution is that the execution of partitions with fewer data will be completed first and the partitions with a larger amount of data will take a long time. This can lead to reduced performance of the Spark job

How data is ultimately distributed RDD Partitions depends on the default partitioner that’s available in Spark. It’s Spark’s Hash Partitioner that is the default partitioner. For this reason, irrespective of how many times the “repartition()” method is applied on top of that DataSet, the data might still not be evenly distributed amongst all the partitions. Hence, the even distribution of data in the Spark RDD is not always guaranteed with the default partitioner.

To avoid such situations, it is critical to be equipped with the ability to apply a Custom Partition. Spark provides an option to create a “custom partitioner” where one can apply the logic of data partitioning on RDDs based on custom conditions.

Below, we will see in detail an example of:

  1. how the Default Partitioner works for a given DataSet,
  2. how the application of the repartition method will not be sufficient to evenly distribute the data set, and
  3. how the Customer Partitioner can help resolve the problem of uneven distribution for the same DataSet

Spark Default Partitioner

Spark splits data into different partitions and processes the data in a parallel fashion. It uses a Hash Partitioner, by default, to partition the data across different partitions. The Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode. On the basis of this concept, the Hash Partitioner will divide the keys that have the same hashcode and distribute them across the partitions.

Example of Default Spark Partitioner

Let’s take a DataSet of strings to understand how the data is partitioned.

SparkSession sparkSession = SparkSession.builder
().master("local[*]").getOrCreate();final List<String> strings

= Arrays.asList("Owl", "Parrot", "Crow", "Ibis", "WoodPecker",
"Bulbul", "Falcon",
"Eagle", "Kite", "Toucan");final Dataset<String> stringDataset = s
parkSession.createDataset(strings, Encoders.STRING());

stringDataset is split into 4 partitions

int noOfPartitions = stringDataset.rdd().partitions().length;//=> 4

Every partition is a separate CSV file when we write into the disc:

stringDataset.write().csv("src\\main\\test_output");

Here is how data is partitioned on different partitions:

Partiiton 0: Owl,Parrot
Partition 1: Crow,Ibis,WoodPecker
Partition 2: Bulbul,Falcon
Partition 3: Eagle,Kite,Toucan

In the above example, though it's a small DataSet, consider each word as substantial data that takes significant time to process. Hence, rather than processing the data with 4 partitions, one may prefer to process the data with more partitions with fewer data elements in each, thus increasing the parallelism. That’s where increasing the number of partitions by applying the repartition() method can be helpful. Below, is a small example:

Repartition

The repartition() method in Spark is used either to increase or decrease the partitions in a Dataset.

Let’s apply repartition on the previous DataSet and see how data is distributed among partitions:

Dataset<String> repartitionedDS = stringDataset.repartition(8);
repartitionedDS.write().csv("src\\main\\test_output");
int noOfPartitions = repartitionedDS.rdd().partitions().length;//=>8

Here is how data is partitioned on different partitions-

Partition 0: Crow,Toucan
Partition 1:
Partition 2:
Partition 3:
Partition 4:
Partition 5:
Partition 6: Owl,Ibis,Bulbul,Eagle
Partition 7: Parrot,WoodPecker,Falcon,Kite

The above data is not partitioned evenly amongst all the partitions. Though we have increased the size of the partitions by applying the repartition() method, the data is not evenly distributed. This is because of the Hash Partitioner. Hence, sometimes, applying repartition() also may not solve the problem.

To deal with such situations, Spark provides the Custom Partitioner option. Let’s try to implement it with an example and see how it helps-

Custom Partitioner

Please note that the Custom Partitioner can be applied only on PairedRDDs and so one has to go via this route of creating a PairedRDD from the original RDD and then use the Customer Partitioner on top of it.

You have to extend the org.apache.spark.Partitioner class, in order to use Spark Custom Partitioner and override the below methods:

  1. numPartitions: This method returns the number of partitions to be created for an RDD
  2. getPartition(key: Any): This method returns the partition number into which partition the key should go (ranging from 0 to nnumberOfPartitions-1)

Custom partitioning lets you alter the size and number of partitions as per your application’s needs. Here, you can define which key should enter which partition. One should provide an explicit partitioner by calling partitionBy method on a paired RDD.

Main Class

public class SparkPartitionExample implements Serializable {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder ().master("local[*]").getOrCreate();

final List<String> strings = Arrays.asList("Owl", "Parrot", "Crow", "Ibis", "WoodPecker", "Bulbul", "Falcon", "Eagle", "Kite", "Toucan");
// Creatng a data set
final Dataset<String> stringDataset = sparkSession. createDataset(strings, Encoders.STRING());
// However, since a paired RDD is required. Hence ocnverting DataSet to rdd and then to pairedRDD.
final JavaRDD<String> stringJavaRDD = stringDataset.toJavaRDD();
final JavaPairRDD<String, Integer> pairRdd = stringJavaRDD. mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<> (s, s.length()));
// Applying the custom partitioner.
final JavaPairRDD<String, Integer> outputPairedRDD = pairRdd. partitionBy(new CustomPartition(8));
final int numPartitions = outputPairedRDD.getNumPartitions(); //=> 8
// Converting the paired RDD into RDD and then to data set again.
final JavaRDD<String> outputRdd = outputPairedRDD.map(x -> x._1);
final Dataset<String> outputDataSet = sparkSession.createDataset (outputRdd.rdd(), Encoders.STRING());

outputDataSet.write().csv("src\\main\\test_output");
}
}

Custom Partitioner Class

public class CustomPartition extends Partitioner {

private final int numParts;
public CustomPartition(int i) {
numParts = i;
}
@Override
public int numPartitions() {
return numParts;
}

// Applying the logic to decide the target partition number. You can have your own logic.
@Override
public int getPartition(Object key) {
return ((String) key).charAt(0) % numParts;
}
}

Here is how the data is partitioned on different partitions:

Partition 0: Parrot
Partition 1: Ibis
Partition 2: Bulbul
Partition 3: Crow,Kite
Partition 4: Toucan
Partition 5: Eagle
Partition 6: Falcon
Partition 7: Owl,WoodPecker

Now you see that the data in the RDD is evenly distributed based on the logic provided in the custom Partitioner. Hence you get better control over how your RDD will be partition during the job execution.

I hope this blog has helped you familiarize yourself with the basic problem of uneven distribution and one of the options Spark provides to deal with the same. Please share your constructive comments below.

Happy reading and happy Spark Coding!!

Author
Naveen Chitti

Enthusiastic Data Engineer

Tags: Big Data Spark Java Programming Technology Data Engineering