3bc92d14c3ef6c6c2b035f28609e76186c1f62ab
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformer.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
6 import org.apache.kafka.streams.processor.ProcessorContext;
7 import org.apache.kafka.streams.state.KeyValueStore;
8
9 import java.util.Arrays;
10 import java.util.Collections;
11
12
13 @Slf4j
14 public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
15 {
16   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
17   private ProcessorContext context;
18   private KeyValueStore<Integer, Long> store;
19
20
21   @Override
22   public void init(ProcessorContext context)
23   {
24     this.context = context;
25     store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
26   }
27
28   @Override
29   public Iterable<String> transform(String key, String value)
30   {
31     String topic = context.topic();
32     Integer partition = context.partition();
33     long offset = context.offset();
34     Headers headers = context.headers();
35
36     long sequenceNumber = Long.parseLong(value);
37
38     Long seen = store.get(partition);
39     if (seen == null || seen < sequenceNumber)
40     {
41       store.put(partition, sequenceNumber);
42       return Arrays.asList(value);
43     }
44
45     log.info(
46         "ignoring message for key {} with sequence-number {} <= {}",
47         key,
48         sequenceNumber,
49         seen);
50
51     // Signal, that the message has already been seen.
52     // Downstream has to filter the null-values...
53     return Collections.emptyList();
54   }
55
56   @Override
57   public void close() {}
58 }