Deduplicating Partitioned Data With a Kafka Streams ValueTransformer
Inspired by a current customer project and this article about deduplicating events with Kafka Streams I want to share a simple but powerful implementation of a deduplication mechanism, that works well for partitioned data and does not suffer of memory leaks, because a countless number of message-keys has to be stored.
Yet, the presented approach does not work for all use-cases, because it presumes, that a strictly monotonically increasing sequence numbering can be established across all messages – at least concerning all messages, that are routed to the same partition.
The Problem
A source produces messages, with reliably unique ID’s. From time to time, sending these messages to Kafka may fail. The order, in which these messages are send, is crucial with respect to the incedent, they belong to. Resending the messages in correct order after a failure (or downtime) is no problem. But some of the messages may be send twice (or more often), because the producer does not know exactly, which messages were send successful.
Incident A - { id: 1, data: "ab583cc8f8" }
Incident B - { id: 2, data: "83ccc8f8f8" }
Incident C - { id: 3, data: "115tab5b58" }
Incident C - { id: 4, data: "83caac564b" }
Incident B - { id: 5, data: "a583ccc8f8" }
Incident A - { id: 6, data: "8f8bc8f890" }
Incident A - { id: 7, data: "07583ab583" }
<< DOWNTIME OR FAILURE >>
Incident C - { id: 4, data: "83caac564b" }
Incident B - { id: 5, data: "a583ccc8f8" }
Incident A - { id: 6, data: "8f8bc8f890" }
Incident A - { id: 7, data: "07583ab583" }
Incident A - { id: 8, data: "930fce58f3" }
Incident B - { id: 9, data: "7583ab93ab" }
Incident C - { id: 10, data: "7583aab583" }
Incident B - { id: 11, data: "b583075830" }
Since eache message has a unique ID, all messages are inherently idempotent: Deduplication is no problem, if the receiver keeps track of the messages, he has already seen.
Where is the problem?, you may ask. That’s trivial, I just code the deduplication into my consumer!
But this approach has several drawbacks, including:
- Implementing the trivial algorithm described above is not efficent, since the algorithm in general has to remember the IDs of all messages for an indefinit period of time.
- Implementing the algorithm over and over again for every consumer is cumbersome and errorprone.
Wouldn’t it be much nicer, if we had an efficient and bulletproof algorithm, that we can simply plug into our Kafka-pipelines?
The Idea
In his blog-article Jaroslaw Kijanowski describes three deduplication algorithms. The first does not scale well, because it does only work for single-partition topics. The third aims at a slightly different problem and might fail deduplicating some messages, if the timing is not tuned correctly. The looks like a robust solution. But it also looks a bit hacky and is unnecessary complex in my opinion.
Playing around with his ideas, i have come up with the following algorithm, that combines elements of all three solutions:
- All messages are keyed by an ID that represents the incident – not the message. This guarantees, that all messages concerning a specific incident will be stored in the same partition, so that their ordering is retained.
- We generate unique strictly monotonically increasing sequence numbers, that are assigned to each message. If the IDs of the messages fullfill these requirements and are stored in the value (like above), they can be reused as sequence numbers
- We keep track of the sequence number last seen for each partition.
- We drop all messages with sequnce numbers, that are not greater than the last sequence number, that we saw on that partition.
The algorithm uses the well known approach, that TCP/IP uses to detect and drop duplicate packages.
It is efficient, since we never have to store more sequence numbers, than partitions, that we are handling.
The algorithm can be implemented easily based on a ValueTransformer
, because Kafka Streams provides the ability to store state locally.
A simplified example-implementation
To clearify the idea, I further simplified the problem for the example implementation:
-
Key and value of the messages are of type
String
, for easy scripting. - In the example implementation, person-names take the part of the ID of the incident, that acts out as message-key.
- The value of the message solely consists of the sequence number. In a real-world use-case, the sequence number would be stored in the message-value and would have to be extracted from there. Or it would be stored as a message-header.
That is, our message stream is simply a mapping from names to unique sequence numbers and we want to be able to separate out the contained sequence for a single person, without duplicate entries and without jeopardizing the order of that sequence.
In this simplified setup, the implementation effectively boils down to the following method-override:
@Override
public Iterable<String> transform(String value)
{
Integer partition = context.partition();
long sequenceNumber = Long.parseLong(value);
Long seen = store.get(partition);
if (seen == null || seen < sequenceNumber)
{
store.put(partition, sequenceNumber);
return Arrays.asList(value);
}
return Collections.emptyList();
}
- We can get the active partition from the
ProcessorContext
, that is handed to our Instance in the constructor, which is not shown here for brevity. - Parsing the
String
-value of the message aslong
corresponds to the extraction of the sequence number from the value of the message in our simplified setup. - We then check the local state, if a sequence-number was already seen for the active partition. Kafka Streams takes care of the initialization and resurection of the local state. Take a look at the full source-code see, how we instruct Kafka Streams to do so.
- If this is the first sequence-number, that we see for this partition, or if the sequence-number is greater (that is: newer) than the stored one, we store it in our local state and return the value of the message, because it was seen for the first time.
- Otherwise, we instruct Kafka Streams to drop the current (duplicate!) value, by returning an empty array.
We can use our ValueTransformer
with flatTransformValues()
,
to let Kafka Streams drop the detected duplicate values:
streamsBuilder
.stream("input")
.flatTransformValues(
new ValueTransformerSupplier()
{
@Override
public ValueTransformer get()
{
return new DeduplicationTransformer();
}
},
"SequenceNumbers")
.to("output");
One has to register an appropriate store to the StreamsBuilder
under the referenced name.
The full source is available on github.com
Recapping Our Assumptions…
The presented deduplication algorithm presumes some assumptions, that may not fit your use-case. It is crucial, that these prerequisites are not violated. Therefor, I will spell them out once more:
- We can generate unique strictly monotonically increasing sequence numbers for all messages (of a partition).
- We have a strict ordering of all messages (per partition).
- And hence, since we want to handle more than one partition: The data is partitioned by key. That is, all messages for a specific key must always be routed to the same partition.
As a conclusion of this assumptions, we have to note: We can only deduplicate messages, that are routed to the same partition. This follows, because we can only guarantee message-order per partition. But it should not be a problem for the same reason: We assume a use-case, where all messages concerning a specific incident are captured in the same partition.
What is not needed – but also does not hurt
Since we are only deduplicating messages, that are routed to the same partition, we do not need globally unique sequence numbers. Our sequence numbers only have to be unique per partition, to enable us to detect, that we have seen a specific message before on that partition. Golbally unique sequence numbers clearly are a stronger condition: It does not hurt, if the sequence numbers are globally unique, because they are always unique per partition, if they are also globally unique.
We detect unseen messages, by the fact that their sequence number is greater than the last stored hight watermark for the partition, they are routed to. Hence, we do not rely on a seamless numbering without gaps. It does not hurt, if the series of sequence numbers does not have any gaps, as long as two different messages on the same partition never are assigned to the same sequence number.
That said, it should be clear, that a globally unique seamless numbering of all messages across all partitions – as in our simple example-implementation – does fit well with our approach, because the numbering is still unique, if one only considers the messages in one partition, and the gaps in the numbering, that are introduced by focusing only on the messages of a single partition, are not violating our assumptions.
Pointless / Contradictorily Usage Of The Presented Approach
Last but not least, I want to point out, that this approach silently assumes, that the sequence number of the message is not identically to the key of the message. On the contrary: The sequence number is expected to be different from the key of the message!
If one would use the key of the message as its sequence number (provided that it is unique and represents a strictly increasing sequence of numbers), one would indeed assure, that all duplicates can be detected, but he would at once force the implementation to be indifferent, concerning the order of the messages.
That is, because subsequent messages are forced to have different keys, because all messages are required to have unique sequence numbers. But messages with different keys may be routed to different partitions – and Kafka can only guarantee message ordering for messages, that live on the same partition. Hence, one has to assume, that the order in which the messages are send is not retained, if he uses the message-keys as sequence numbers – unless, only one partition is utilized, which is contradictory to our primary goal here: enabling scalability through data-sharding.
This is also true, if the key of a message contains an invariant ID and only embeds the changing sequence number. Because, the default partitioning algorithm always considers the key as a whole, and if any part of it changes, the outcome of the algorithm might change.
In a production-ready implementation of the presented approach, I would advice, to store the sequence number in a message header, or provide a configurable extractor, that can derive the sequence number from the contents of the value of the message. It would be perfectly o.k., if the IDs of the messages are used as sequence numbers, as long as they are unique and monotonically increasing and are stored in the value of the message – not in / as the key!
Leave a Reply