Handling data that arrives outside the expected timeframe in Kafka using watermarking in Spark Structured Streaming
Clairvoyant, as a company, carries vast experience in Big data and Cloud technologies. Spark Structured Streaming with Kafka is one of its major implementations. With the latest versions of Spark, we can handle data that arrives outside the expected time frame by implementing watermarking. This blog showcases how we handled it for one of our clients with a POC.
Spark Structured Streaming:
Spark Structured Streaming is built on top of the Spark SQL engine, which processes streaming data. All the transformations can be performed the same way on the streaming data as done on static data using Spark Structured Streaming. And whatever transformations we perform will be run incrementally and continuously on the streaming data as they arrive. Spark SQL engine will take care of running the transformations on the streaming data.
Watermarking in Spark Structured Streaming:
Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.
To run this query for days, information regarding the in-memory state accumulations should be known to the system. This is because the application should be aware of when it will stop receiving the late data to facilitate the aggregate data.
In simple words, late data within the threshold will be aggregated, but data later than the threshold will start getting dropped.
The way spark handles this internally is as follows: Since we are dealing with micro-batches, the watermark for every batch is calculated at the end of its previous batch. That means a watermark is calculated for every batch before the execution even begins. Individual watermarks are calculated first, and the minimum value is chosen later as a global watermark used to drop the events. In the case of multiple streams, Spark keeps track of the highest watermark among all the streams.
Example of watermark calculation in case of application reading from a single Kafka topics
Example of watermark calculation in case of application reading from two Kafka topics
The above logic determines which events should be ignored and which ones should be buffered, and once buffered, when should they be kicked out of the buffer? If we don’t kick it out of the buffer, it will eventually occupy all the available memory! To do that, we use the time range conditions.
Let us assume we are reading sales and shipments related data from two different streams.
In the above snippet, we are using saleTime and shipmentTime to track the watermark, and we are joining the two streams on another column as well, titled ‘shipmentAdID’ and ‘SalesAdId’.
So the idea is to keep events within an hour of each other in the buffer and as they get older than that, drop them from the buffer.
Conditions for Watermarking:
There are some limitations to implementing watermarking. So we need to make sure the conditions are met.
Output mode must either be ‘append,’ or ‘update’. The Spark supports a few output modes. Out of these, only `append` and `update` are supported while implementing the watermark.
withWatermark must be called on the same column used in the aggregate. For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, this is because the watermark is not defined on the aggregation column.
withWatermark must be called before the aggregation to use the watermark’s details. For example, df.groupBy("time").count().withWatermark("time", "1 min") is invalid in the Append output mode.
In one of our projects, we had a use case to join two types of events coming from two different Kafka topics. And also, the joining of the events had to be performed by aggregating the data that arrived late. So we planned to do a POC and check the efficiency of watermarking.
We have taken 2 Kafka topics and are sending in data that we have and need to be joined.
We have used the 2.3.2 version of Spark, which does not support out-of-the-box metric collection and visualization. So we attached a spark listener class in our application that extracts stats from every micro-batch and stores them as JSON in HDFS.
Spark 3.0 will provide a Streaming Tab in Spark UI that will show streaming stats in real-time. Until then, it is important to monitor the health of the streaming app. Below are the important metrics that we captured:
triggerExecution : It reports how long the execution of a given trigger, i.e. offsets retrieval, data processing, and WAL commit, took.
queryPlanning: The execution plan is generated here.
getBatch: It retrieves new data from the available source (micro-batch only) and doesn’t retrieve physical data. The metadata about the next micro-batch alone is retrieved.
getOffset: It measures the time taken to retrieve offsets for new data to process each defined source.
walCommit: It measures the time taken to commit the newly available offsets.
addBatch: It returns the time taken to send the data to the given sink.
watermark: It is the current watermark of the micro-batch
stateOperators.numRowsTotal: It is the number of rows held in the Spark state store.
Sources.numInpurRows: It is the number of rows read from each source in a micro-batch.
Challenges we faced with the current version of Spark for watermarking:
Full outer joins are not supported. https://issues.apache.org/jira/browse/SPARK-26154
Left join has a data inconsistency issue ( solved in Spark 3.0+). Some issues have been reported. Even we faced inconsistency in results.
Spark Streaming Tab to view streaming health is available only in Spark 3.0+ versions. That's why we took to the Listener to get the stats.
Support for writing to multiple sinks (spark 2.4)
Results of POC:
Finally, we achieved a 93% data match. The remaining 7% remained unachieved due to the data arriving outside the watermark. We bumped up the watermark for up to 1 hour and got another 4% of the data matched. We were unable to bump it up further as the state store was getting filled up due to the extremely high volume of data (80 Million messages per day).
 Handling Late Data and Watermarking —