Further simplified the example: Removed unnecessary variables
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformer.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.kstream.ValueTransformer;
5 import org.apache.kafka.streams.processor.ProcessorContext;
6 import org.apache.kafka.streams.state.KeyValueStore;
7
8 import java.util.Arrays;
9 import java.util.Collections;
10
11
12 @Slf4j
13 public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
14 {
15   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
16   private ProcessorContext context;
17   private KeyValueStore<Integer, Long> store;
18
19
20   @Override
21   public void init(ProcessorContext context)
22   {
23     this.context = context;
24     store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
25   }
26
27   @Override
28   public Iterable<String> transform(String value)
29   {
30     Integer partition = context.partition();
31     long sequenceNumber = Long.parseLong(value);
32
33     Long seen = store.get(partition);
34     if (seen == null || seen < sequenceNumber)
35     {
36       store.put(partition, sequenceNumber);
37       return Arrays.asList(value);
38     }
39
40     log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
41
42     // Signal, that the message has already been seen.
43     // Downstream has to filter the null-values...
44     return Collections.emptyList();
45   }
46
47   @Override
48   public void close() {}
49 }