e07fb372f6586f6570d51327557aec32a9bbf704
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformer.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
6 import org.apache.kafka.streams.processor.ProcessorContext;
7 import org.apache.kafka.streams.state.KeyValueStore;
8
9 import java.util.Arrays;
10 import java.util.Collections;
11
12
13 @Slf4j
14 public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
15 {
16   final SequenceNumberExtractor<K, V> extractor;
17
18   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
19   private ProcessorContext context;
20   private KeyValueStore<Integer, Long> store;
21
22
23   public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
24   {
25     this.extractor = extractor;
26   }
27
28
29   @Override
30   public void init(ProcessorContext context)
31   {
32     this.context = context;
33     store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
34   }
35
36   @Override
37   public Iterable<V> transform(K key, V value)
38   {
39     String topic = context.topic();
40     Integer partition = context.partition();
41     long offset = context.offset();
42     Headers headers = context.headers();
43
44     long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
45
46     Long seen = store.get(partition);
47     if (seen == null || seen < sequenceNumber)
48     {
49       store.put(partition, sequenceNumber);
50       return Arrays.asList(value);
51     }
52
53     log.info(
54         "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
55         key,
56         sequenceNumber,
57         seen,
58         offset,
59         partition);
60
61     // Signal, that the message has already been seen.
62     // Downstream has to filter the null-values...
63     return Collections.emptyList();
64   }
65
66   @Override
67   public void close() {}
68 }