X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;h=20cb4d22e8ea6bbcbb27aa329480669fab05b47e;hb=f8f9b6397ba0096bffa463e09a2db93277a3f9cf;hp=5fc183ccebbb9b0ed685c9086b7d001c45e45a5e;hpb=db4b181f863add3e78c01d110c1b4feabd8ecb89;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 5fc183c..20cb4d2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -25,6 +27,7 @@ public class CounterApplication @Bean(initMethod = "start", destroyMethod = "stop") public CounterStreamProcessor streamProcessor( CounterApplicationProperties properties, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper objectMapper, ConfigurableApplicationContext context) { @@ -45,6 +48,7 @@ public class CounterApplication properties.getInputTopic(), properties.getOutputTopic(), propertyMap, + storeSupplier, objectMapper); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> @@ -62,6 +66,12 @@ public class CounterApplication return streamProcessor; } + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("counter"); + } + public static void main(String[] args) { SpringApplication.run(CounterApplication.class, args);