top10: 1.2.1 - The name of the state-store is an internal detail
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10ApplicationConfiguration.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerConfig;
5 import org.apache.kafka.streams.StreamsConfig;
6 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
7 import org.apache.kafka.streams.state.Stores;
8 import org.springframework.boot.SpringApplication;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.ConfigurableApplicationContext;
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.context.annotation.Configuration;
13 import org.springframework.kafka.support.serializer.JsonDeserializer;
14 import org.springframework.kafka.support.serializer.JsonSerde;
15
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Properties;
19 import java.util.concurrent.CompletableFuture;
20
21 import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
22 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
23
24
25 @Configuration
26 @EnableConfigurationProperties(Top10ApplicationProperties.class)
27 @Slf4j
28 public class Top10ApplicationConfiguration
29 {
30         @Bean
31         public Properties streamProcessorProperties(Top10ApplicationProperties properties)
32         {
33                 Properties props = new Properties();
34
35                 props.putAll(serializationConfig());
36
37                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
38                 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
39                 if (properties.getCommitInterval() != null)
40                         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
41                 if (properties.getCacheMaxBytes() != null)
42                         props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
43
44                 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
45
46                 return props;
47         }
48
49         static Map<String, Object> serializationConfig()
50         {
51                 Map<String, Object> props = new HashMap<>();
52
53                 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
54                 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
55                 props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
56                 props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
57                 props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
58                 props.put(
59                                 JsonDeserializer.TYPE_MAPPINGS,
60                                 "word:" + Key.class.getName() + "," +
61                                 "counter:" + Entry.class.getName() + "," +
62                                 "user:" + User.class.getName() + "," +
63                                 "ranking:" + Ranking.class.getName());
64
65                 return props;
66         }
67
68         @Bean(initMethod = "start", destroyMethod = "stop")
69         public Top10StreamProcessor streamProcessor(
70                         Top10ApplicationProperties applicationProperties,
71                         Properties streamProcessorProperties,
72                         KeyValueBytesStoreSupplier storeSupplier,
73                         ConfigurableApplicationContext context)
74         {
75                 Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
76                                 applicationProperties.getInputTopic(),
77                                 applicationProperties.getOutputTopic(),
78                                 streamProcessorProperties,
79                                 storeSupplier);
80
81                 streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
82                 {
83                         log.error("Unexpected error!", e);
84                         CompletableFuture.runAsync(() ->
85                         {
86                                 log.info("Stopping application...");
87                                 SpringApplication.exit(context, () -> 1);
88                         });
89                         return SHUTDOWN_CLIENT;
90                 });
91
92                 return streamProcessor;
93         }
94
95         @Bean
96         public KeyValueBytesStoreSupplier storeSupplier()
97         {
98                 return Stores.persistentKeyValueStore(STORE_NAME);
99         }
100 }