X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;h=31086ca9b5b0fffca94323e2d10892b24a1aee25;hb=b8f6728e1957d3cfdd34efebd430e33797b67b58;hp=20cb4d22e8ea6bbcbb27aa329480669fab05b47e;hpb=3fe874718db7a8955299fdf9eacb7d7a535ff946;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 20cb4d2..31086ca 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -24,12 +24,8 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j public class CounterApplication { - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties properties, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper objectMapper, - ConfigurableApplicationContext context) + @Bean + public Properties propertyMap(CounterApplicationProperties properties) { Properties propertyMap = new Properties(); @@ -44,6 +40,17 @@ public class CounterApplication propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return propertyMap; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public CounterStreamProcessor streamProcessor( + CounterApplicationProperties properties, + Properties propertyMap, + KeyValueBytesStoreSupplier storeSupplier, + ObjectMapper objectMapper, + ConfigurableApplicationContext context) + { CounterStreamProcessor streamProcessor = new CounterStreamProcessor( properties.getInputTopic(), properties.getOutputTopic(),