<img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=2877026&amp;fmt=gif">

Writing Custom Partitioner for Apache Kafka

By Imteyaz Ahmad - April 18, 2019

Topic and Partitions

In Kafka, a topic is a log identified by the topic name. The Kafka producers send the messages/data to a particular topic, and consumers read the data from that topic. A topic can further be sub-divided into partitions. A record is stored in a partition. The strategies for sending the record to a partition are explained in the following section.

How DefaultPartitioner works?

The below explains the default partitioning strategy:

  • If a partition is specified in the record, use it - this can be used when we already know the partition

  • If no partition is specified but a key is present, choose a partition based on a hash of the key - It can be used when we want to distribute the data based on a key. The following formula is used to determine the partition:
    hashCode(key) % noOfPartitions

  • If no partition or key is present, choose a partition in a round-robin fashion - If we are not bothered about which partition our data is going to, this strategy can be used.

Default strategies work well in the beginning. The problem surfaces when we want the data for the same customers to go to the same partition, and we have used a composite key to partition the data. For instance, let’s consider the case of using a key that contains a customer ID and a date. Now, although the customer ID is fixed, the date can change, and we will end up with a different hash code, which will mean that the data for the same customer will go to different partitions.

Writing custom Partitioner

Apache Kafka provides an interface called Partitioner. This interface has three methods as shown in the code below. Kafka also provides a default implementation of this interface called DefaultPartitioner. We need to override these methods for writing our custom partitioner. We can also extend the DefaultPartitioner and override the required method(s) only.

/**
 * Configure this class with the given key-value pairs
 */
void configure(Map<String, ?> configs); // this is inherited 
from another interface Configurable
/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes The serialized key to partition on( or null if no 
key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ void close();

Let’s take the example of a ticket booking app. Here, we want all the booking-related data of a particular customer to go to a particular partition, and we have a key that is comprised of the customer ID and booking date.

/**
 * created by imteyaza-1lm on 2019-04-18
 **/
public class BookingKey {

  private String customerId;
  private LocalDate bookingDate;

  public BookingKey(String customerId, LocalDate bookingDate) {
    this.customerId = customerId;
    this.bookingDate = bookingDate;
  }

  public String getCustomerId() {
    return customerId;
  }

  public LocalDate getBookingDate() {
    return bookingDate;
  }

  @Override
  public String toString() {
    return "BookingKey{" +
        "customerId='" + customerId + '\'' +
        ", bookingDate=" + bookingDate +
        '}';
  }
}  

Here, the customer can book the tickets on different dates, which means the hash code will be different for each key. If we include the booking date along with the customerId, the data for the same customer can go to different partitions.

To solve this problem, we need to override the partition logic, which will mean it will include only the customerId to make sure that data for a particular customer always goes to the same partition.

Here, we have extended the DefaultPartitioner and overrode the logic to get the bytes for the key. Rest of the things are delegated to the parent class.

import java.util.Objects;
import 
org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; /** * created by imteyaza-1lm on 2019-04-16 **/ public class CustomPartitioner extends DefaultPartitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String partitionKey = null; if (Objects.nonNull(key)) { BookingKey bookingKey = (BookingKey) key; partitionKey = bookingKey.getCustomerId(); //Ignore bookingKey.getBookingDate() keyBytes = partitionKey.getBytes(); } return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster); } }

Using the custom partitioner

Once we have created our custom partitioner, we need to define it in producer properties as follows:

package com.clairvoyant.producer;

import java.time.LocalDate;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * created by imteyaza-1lm on 2019-04-18
 **/
public class BookingProducer {

  public static void main(String[] args) {
    String topicName = "consumer";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
// we need to define the key serializer here props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); // here we are using the partitioner we created props.put("partitioner.class",
"com.clairvoyant.producer.CustomPartitioner
"); Producer<BookingKey, String> producer = new KafkaProducer<>(props); for (int i = 0; i <10; i++) { producer.send( new ProducerRecord<>(topicName, new BookingKey(UUID.randomUUID().toString(),
LocalDate.now()), "value" + i)); } producer.close(); } }

Learn about Apache Kafka Serde here. To get the best data engineering solutions for your business, reach out to us at Clairvoyant.

References and Copyrights

Author
Imteyaz Ahmad

Tags: Data Engineering

Fill in your Details