dc888bcc81026b2c95a9b979f14facf5ee4ada77
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformer.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.kstream.ValueTransformer;
6 import org.apache.kafka.streams.processor.ProcessorContext;
7 import org.apache.kafka.streams.state.KeyValueStore;
8
9 import java.util.Arrays;
10 import java.util.Collections;
11
12
13 @Slf4j
14 public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
15 {
16   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
17   private ProcessorContext context;
18   private KeyValueStore<Integer, Long> store;
19
20
21   @Override
22   public void init(ProcessorContext context)
23   {
24     this.context = context;
25     store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
26   }
27
28   @Override
29   public Iterable<String> transform(String value)
30   {
31     String topic = context.topic();
32     Integer partition = context.partition();
33     long offset = context.offset();
34     Headers headers = context.headers();
35
36     long sequenceNumber = Long.parseLong(value);
37
38     Long seen = store.get(partition);
39     if (seen == null || seen < sequenceNumber)
40     {
41       store.put(partition, sequenceNumber);
42       return Arrays.asList(value);
43     }
44
45     log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
46
47     // Signal, that the message has already been seen.
48     // Downstream has to filter the null-values...
49     return Collections.emptyList();
50   }
51
52   @Override
53   public void close() {}
54 }