<img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=2877026&amp;fmt=gif">

Managing Apache Kafka Programmatically

By Shalini Goutam - July 14, 2021

How to list, create, describe and delete topics using AdminClient in Kafka?

Clairvoyant carries vast experience in Big data and Cloud technologies. We aim to explore the core concepts of Apache Kafka and other Big Data technologies to provide the best-optimized solutions to clients. With the introduction of the AdminClient feature in Kafka, we have utilized this alternative for essential topic management to provide better solutions to clients to manage Kafka programmatically.

Introduction

Apache Kafka added the AdminClient in version 0.11 to provide a programmatic API for administrative functionality that was previously done in the command line: Listing, Creating, and Deleting topics, describing the cluster, managing ACLs, and modifying configurations.

Here’s one example: Your application is going to produce events to a specific topic. This means that the topic has to exist before producing the first event. Before Apache Kafka added the admin client, there were few options, and none of them were particularly user-friendly:

  1. You could capture UNKNOWN_TOPIC_OR_PARTITION exceptions from the producer.send() method and let your user know that they need to create the topic

  2. You could hope that the Kafka cluster you are writing to enabled automatic topic creation

  3. You can try to rely on internal APIs and deal with the consequences of no compatibility guarantees

Now that Apache Kafka provides AdminClient, there is a much better solution: Use AdminClient to check whether the topic exists, and if it does not, create it on the spot.

AdminClient Lifecycle: Creating, Configuring, and Closing

In order to use Kafka’s AdminClient, the first thing you have to do is construct an instance of the AdminClient class. This is quite straightforward:

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
adminClient.close(Duration.ofSeconds(30));

The static create method takes a Properties object argument with configuration. The only mandatory configuration is the URI for your cluster (a comma-separated list of brokers to connect to).

If you start an AdminClient, eventually you want to close it. It is important to remember that when you call close(), there could still be some AdminClient operations in progress. Therefore the close() method accepts a timeout parameter. Once you call close(), you can’t call any other methods and send any more requests, but the client will wait for responses until the timeout expires. After the timeout expires, the client will abort all ongoing operations with timeout exception and release all resources. Calling close() without a timeout implies that you’ll wait as long as it takes for all ongoing operations to complete.

Essential Topic Management

Now that we created and configured an AdminClient, it is time to see what we can do with it. The most common use case for Kafka’s AdminClient is topic management. This includes listing topics, describing them, creating topics, and deleting them.

  • Listing all Topics in the Cluster

    ListTopicsResult topics = adminClient.listTopics();
    topics.names().get().forEach(System.out::println);

    Note that admin.listTopics() returns a ListTopicsResult object which is a thin wrapper over a collection of Futures. topics.name() returns a future set of names. When we call get() on this future, the executing thread will wait until the server responds with a set of topic names, or we get a timeout exception. Once we get the list, we iterate over it to print all the topic names.

  • Describing Topics in the Cluster

    DescribeTopicsResult
    demoTopic=adminClient.describeTopics(TOPIC_LIST);
    
    TopicDescription topicDescription =
    demoTopic.values().get(TOPIC_NAME).get();
    1. To check that the topic exists with the correct configuration, we call describeTopics() with a list of topic names that we want to validate. This returns a DescribeTopicResult object, which wraps a map of topic names to future descriptions.

    2. If the topic does exist, the future completes by returning a TopicDescription object, which contains a list of all the partitions of the topic and for each partition which broker is the leader, a list of replicas, and a list of in-sync replicas. Note: this does not include the configuration of the topic.

    3. If the topic does not exist, the server can’t respond with its description. In this case, the server will send an error and the future will complete by throwing an ExecutionException. The actual error sent by the server will be the cause of the exception. Since we want to handle the case where the topic doesn’t exist, we handle these exceptions by creating a topic on the spot.

    4. Note that all AdminClient result objects throw ExecutionException when Kafka responds with an error. This is because AdminClient results are wrapped Future objects and those wrap exceptions. You always need to examine the cause of an ExecutionException to get the error that Kafka returned.

  • Creating Topics in the Cluster

    CreateTopicsResult newTopic = adminClient.createTopics
    (Collections.singletonList(new NewTopic(TOPIC_NAME,
    NUM_PARTITIONS, REPLICATION_FACTOR)));
    1. If the topic does not exist, we create a new topic. When creating a topic, you can specify just the name and use default values for all the details. You can also specify the number of partitions, the number of replicas, and configuration.

    2. Finally, you want to wait for topic creation to return and perhaps validate the result. Checking the result is more common if you relied on broker defaults when creating the topic.

    3. If some topic already exists with the same name or if we call some method to check the results of CreateTopic, TopicExistsException exception can be thrown. It can be handled by describing the topic to check for correct configuration.

  • Deleting Topics in the Cluster

    admin.deleteTopics(TOPIC_LIST).all().get();
    1. We call the method deleteTopics with a list of topic names to delete, and we use get() to wait for this to complete.

    2. In Kafka, deletion of topics is final — there is no “recyclebin” or “trashcan” to help you rescue the deleted topic and no checks to validate that the topic is empty and that you really meant to delete it. Deleting the wrong topic could mean an un-recoverable loss of data — so handle this method with extra care.

Important Links for Reference

Conclusion

  • Admin Client is useful when you want to execute some administrative commands from within your client application rather than using CLI and GUI tools to manage Kafka.

  • Creating new topics on demand based on user input or data is an especially common use case.

  • IOT apps often receive events from user devices, and write events to topics based on the device type. If the manufacturer produces a new type of device, you either have to remember, via some process, to also create a topic. Or alternatively, the application can dynamically create a new topic if it receives events with an unrecognized device type.

  • Automatic topic creation using AdminClient has downsides but avoiding the dependency on additional process to generate topics is an attractive feature in the right scenarios.

    Read this blog to learn more about Apache Spark: out of memory issues. To get the best data engineering solutions, reach out to us at Clairvoyant.

References

https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html

https://www.tabnine.com/code/java/methods/org.apache.kafka.clients.admin.CreateTopicsResult/values

https://www.oreilly.com/library/view/kafka-the-definitive/9781492043072/

Author
Shalini Goutam

Tags: Data Engineering

Fill in your Details