@Slf4j
-public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
+public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
{
- final SequenceNumberExtractor<K, V> extractor;
-
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
private KeyValueStore<Integer, Long> store;
- public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
- {
- this.extractor = extractor;
- }
-
-
@Override
public void init(ProcessorContext context)
{
}
@Override
- public Iterable<V> transform(K key, V value)
+ public Iterable<String> transform(String key, String value)
{
String topic = context.topic();
Integer partition = context.partition();
long offset = context.offset();
Headers headers = context.headers();
- long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
+ long sequenceNumber = Long.parseLong(value);
Long seen = store.get(partition);
if (seen == null || seen < sequenceNumber)
}
log.info(
- "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
+ "ignoring message for key {} with sequence-number {} <= {}",
key,
sequenceNumber,
- seen,
- offset,
- partition);
+ seen);
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...
+++ /dev/null
-package de.juplo.demo.kafka.deduplication;
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-
-
-public class DeduplicationTransformerSupplier<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>>
-{
- SequenceNumberExtractor<K, V> extractor;
-
-
- public DeduplicationTransformerSupplier(SequenceNumberExtractor<K, V> extractor)
- {
- this.extractor = extractor;
- }
-
-
- @Override
- public ValueTransformerWithKey<K, V, Iterable<V>> get()
- {
- return new DeduplicationTransformer<K, V>(extractor);
- }
-}
package de.juplo.demo.kafka.deduplication;
-import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
properties.put("default.key.serde", Serdes.StringSerde.class);
properties.put("default.value.serde", Serdes.StringSerde.class);
- SequenceNumberExtractor<String, String> extractor =
- new SequenceNumberExtractor<String, String>() {
- @Override
- public long extract(
- String topic, int partition, long offset, Headers headers, String key, String value)
- {
- return Long.parseLong(value);
- }
- };
-
StreamsBuilder builder = new StreamsBuilder();
// Create state-store for sequence numbers
builder
.<String, String>stream("input")
.flatTransformValues(
- new DeduplicationTransformerSupplier<String, String>(extractor),
+ new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+ {
+ @Override
+ public ValueTransformerWithKey<String, String, Iterable<String>> get()
+ {
+ return new DeduplicationTransformer();
+ }
+ },
DeduplicationTransformer.STORE)
.to("output");
+++ /dev/null
-package de.juplo.demo.kafka.deduplication;
-
-import org.apache.kafka.common.header.Headers;
-
-
-public interface SequenceNumberExtractor<K,V>
-{
- /**
- * Extracts a sequence number from the given value.
- *
- * The sequence number must be represented as a {@link Long} value.
- *
- * @param topic The topic, the message was issued on
- * @param partition The partition, the message was written to
- * @param offset The offset of the message in the partition
- * @param key The key of the message
- * @param value The value of the message
- * @return a unique ID
- */
- public long extract(String topic, int partition, long offset, Headers headers, K key, V value);
-}