Moved the building of the topology into a static method
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
1 package de.juplo.demo.kafka.deduplication;
2
3
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.KafkaStreams;
6 import org.apache.kafka.streams.StreamsBuilder;
7 import org.apache.kafka.streams.Topology;
8 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
9 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
10 import org.apache.kafka.streams.state.KeyValueStore;
11 import org.apache.kafka.streams.state.StoreBuilder;
12 import org.apache.kafka.streams.state.Stores;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.springframework.stereotype.Component;
16
17 import javax.annotation.PostConstruct;
18 import javax.annotation.PreDestroy;
19 import java.time.Duration;
20 import java.util.Properties;
21
22
23 @Component
24 public class Deduplicator
25 {
26   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
27
28   public final KafkaStreams streams;
29
30
31   public Deduplicator()
32   {
33     Properties properties = new Properties();
34     properties.put("bootstrap.servers", "kafka:9092");
35     properties.put("application.id", "streams-deduplicator");
36     properties.put("default.key.serde", Serdes.StringSerde.class);
37     properties.put("default.value.serde", Serdes.StringSerde.class);
38
39     streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
40     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
41     {
42       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
43       try
44       {
45         streams.close(Duration.ofSeconds(5));
46       }
47       catch (Exception ex)
48       {
49         LOG.error("Could not close KafkaStreams!", ex);
50       }
51     });
52   }
53
54   static Topology buildTopology()
55   {
56     StreamsBuilder builder = new StreamsBuilder();
57
58     // Create state-store for sequence numbers
59     StoreBuilder<KeyValueStore<Integer,Long>> store =
60         Stores.keyValueStoreBuilder(
61             Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
62             Serdes.Integer(),
63             Serdes.Long());
64     // register store
65     builder.addStateStore(store);
66
67     builder
68         .<String, String>stream("input")
69         .flatTransformValues(
70             new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
71             {
72               @Override
73               public ValueTransformerWithKey<String, String, Iterable<String>> get()
74               {
75                 return new DeduplicationTransformer();
76               }
77             },
78             DeduplicationTransformer.STORE)
79         .to("output");
80
81     return builder.build();
82   }
83
84   @PostConstruct
85   public void start()
86   {
87     streams.start();
88   }
89
90   @PreDestroy
91   public void stop()
92   {
93     streams.close(Duration.ofSeconds(5));
94   }
95 }