1 package de.juplo.demo.kafka.deduplication;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
6 import org.apache.kafka.streams.processor.ProcessorContext;
7 import org.apache.kafka.streams.state.KeyValueStore;
9 import java.util.Arrays;
10 import java.util.Collections;
14 public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
16 public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
17 private ProcessorContext context;
18 private KeyValueStore<Integer, Long> store;
22 public void init(ProcessorContext context)
24 this.context = context;
25 store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
29 public Iterable<String> transform(String key, String value)
31 String topic = context.topic();
32 Integer partition = context.partition();
33 long offset = context.offset();
34 Headers headers = context.headers();
36 long sequenceNumber = Long.parseLong(value);
38 Long seen = store.get(partition);
39 if (seen == null || seen < sequenceNumber)
41 store.put(partition, sequenceNumber);
42 return Arrays.asList(value);
46 "ignoring message for key {} with sequence-number {} <= {}",
51 // Signal, that the message has already been seen.
52 // Downstream has to filter the null-values...
53 return Collections.emptyList();
57 public void close() {}