3585604d02d4032b1563edf8e06e59ae5e12cd52
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
1 package de.juplo.demo.kafka.deduplication;
2
3
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.StreamsBuilder;
8 import org.apache.kafka.streams.Topology;
9 import org.apache.kafka.streams.state.KeyValueStore;
10 import org.apache.kafka.streams.state.StoreBuilder;
11 import org.apache.kafka.streams.state.Stores;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.stereotype.Component;
15
16 import javax.annotation.PostConstruct;
17 import javax.annotation.PreDestroy;
18 import java.time.Duration;
19 import java.util.Properties;
20
21
22 @Component
23 public class Deduplicator
24 {
25   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
26
27   public final KafkaStreams streams;
28
29
30   public Deduplicator()
31   {
32     Properties properties = new Properties();
33     properties.put("bootstrap.servers", "kafka:9092");
34     properties.put("application.id", "streams-deduplicator");
35     properties.put("default.key.serde", Serdes.StringSerde.class);
36     properties.put("default.value.serde", Serdes.StringSerde.class);
37
38     SequenceNumberExtractor<String, String> extractor =
39         new SequenceNumberExtractor<String, String>() {
40           @Override
41           public long extract(
42               String topic, int partition, long offset, Headers headers, String key, String value)
43           {
44             return Long.parseLong(value);
45           }
46         };
47
48     StreamsBuilder builder = new StreamsBuilder();
49
50     // Create state-store for sequence numbers
51     StoreBuilder<KeyValueStore<Integer,Long>> store =
52         Stores.keyValueStoreBuilder(
53             Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
54             Serdes.Integer(),
55             Serdes.Long());
56     // register store
57     builder.addStateStore(store);
58
59     builder
60         .<String, String>stream("input")
61         .flatTransformValues(
62             new DeduplicationTransformerSupplier<String, String>(extractor),
63             DeduplicationTransformer.STORE)
64         .to("output");
65
66     Topology topology = builder.build();
67     streams = new KafkaStreams(topology, properties);
68     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
69     {
70       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
71       try
72       {
73         streams.close(Duration.ofSeconds(5));
74       }
75       catch (Exception ex)
76       {
77         LOG.error("Could not close KafkaStreams!", ex);
78       }
79     });
80   }
81
82
83   @PostConstruct
84   public void start()
85   {
86     streams.start();
87   }
88
89   @PreDestroy
90   public void stop()
91   {
92     streams.close(Duration.ofSeconds(5));
93   }
94 }