From: Kai Moritz Date: Wed, 15 Feb 2023 06:51:46 +0000 (+0100) Subject: counter: 1.1.10 - Refactored the configuration inito a separated class X-Git-Tag: counter-1.1.10 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=e1ec32bcc70e5f990ca2ddb5566f98cee7e4c2c2 counter: 1.1.10 - Refactored the configuration inito a separated class --- diff --git a/pom.xml b/pom.xml index b0ca36b..48183b5 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.1.9 + 1.1.10 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 31086ca..e6d3b1f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -1,84 +1,12 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -@Slf4j public class CounterApplication { - @Bean - public Properties propertyMap(CounterApplicationProperties properties) - { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); - if (properties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); - if (properties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return propertyMap; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties properties, - Properties propertyMap, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper objectMapper, - ConfigurableApplicationContext context) - { - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - properties.getInputTopic(), - properties.getOutputTopic(), - propertyMap, - storeSupplier, - objectMapper); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.persistentKeyValueStore("counter"); - } - public static void main(String[] args) { SpringApplication.run(CounterApplication.class, args); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java new file mode 100644 index 0000000..f1da407 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -0,0 +1,81 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(CounterApplicationProperties.class) +@Slf4j +public class CounterApplicationConfiguriation +{ + @Bean + public Properties propertyMap(CounterApplicationProperties properties) + { + Properties propertyMap = new Properties(); + + propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); + if (properties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return propertyMap; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public CounterStreamProcessor streamProcessor( + CounterApplicationProperties properties, + Properties propertyMap, + KeyValueBytesStoreSupplier storeSupplier, + ObjectMapper objectMapper, + ConfigurableApplicationContext context) + { + CounterStreamProcessor streamProcessor = new CounterStreamProcessor( + properties.getInputTopic(), + properties.getOutputTopic(), + propertyMap, + storeSupplier, + objectMapper); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("counter"); + } +}