1 package de.juplo.demo.kafka.deduplication;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.kstream.ValueTransformer;
5 import org.apache.kafka.streams.processor.ProcessorContext;
6 import org.apache.kafka.streams.state.KeyValueStore;
8 import java.util.Arrays;
9 import java.util.Collections;
13 public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
15 public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
16 private ProcessorContext context;
17 private KeyValueStore<Integer, Long> store;
21 public void init(ProcessorContext context)
23 this.context = context;
24 store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
28 public Iterable<String> transform(String value)
30 Integer partition = context.partition();
31 long sequenceNumber = Long.parseLong(value);
33 Long seen = store.get(partition);
34 if (seen == null || seen < sequenceNumber)
36 store.put(partition, sequenceNumber);
37 return Arrays.asList(value);
40 log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
42 // Signal, that the message has already been seen.
43 // Downstream has to filter the null-values...
44 return Collections.emptyList();
48 public void close() {}