1 package de.juplo.demo.kafka.deduplication;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
6 import org.apache.kafka.streams.processor.ProcessorContext;
7 import org.apache.kafka.streams.state.KeyValueStore;
9 import java.util.Arrays;
10 import java.util.Collections;
14 public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
16 final SequenceNumberExtractor<K, V> extractor;
18 public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
19 private ProcessorContext context;
20 private KeyValueStore<Integer, Long> store;
23 public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
25 this.extractor = extractor;
30 public void init(ProcessorContext context)
32 this.context = context;
33 store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
37 public Iterable<V> transform(K key, V value)
39 String topic = context.topic();
40 Integer partition = context.partition();
41 long offset = context.offset();
42 Headers headers = context.headers();
44 long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
46 Long seen = store.get(partition);
47 if (seen == null || seen < sequenceNumber)
49 store.put(partition, sequenceNumber);
50 return Arrays.asList(value);
54 "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
61 // Signal, that the message has already been seen.
62 // Downstream has to filter the null-values...
63 return Collections.emptyList();
67 public void close() {}