X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10ApplicationConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10ApplicationConfiguration.java;h=6e1f93ff2bb634ba01ca958b0bf437bc6b5fd821;hb=9e0ee342b531e7cfcddfe2e34390807802725a14;hp=000db0121656b20181427bd5f2dc0b3a7b2bd880;hpb=b69e1270a308f200b2640a01d37d4636a0a549e1;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 000db01..6e1f93f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -3,6 +3,8 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -55,12 +57,14 @@ public class Top10ApplicationConfiguration public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - streamProcessorProperties); + streamProcessorProperties, + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -75,4 +79,10 @@ public class Top10ApplicationConfiguration return streamProcessor; } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("top10"); + } }