query: 1.0.6 - Refined `QueryApplicationConfiguration`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
index b546b8c..50a5364 100644 (file)
@@ -6,6 +6,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -19,6 +21,7 @@ import java.net.Socket;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -85,6 +88,7 @@ public class QueryApplicationConfiguration
                        Properties streamProcessorProperties,
                        HostInfo applicationServer,
                        QueryApplicationProperties applicationProperties,
+                       KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper,
                        ConfigurableApplicationContext context)
        {
@@ -93,6 +97,7 @@ public class QueryApplicationConfiguration
                                applicationServer,
                                applicationProperties.getUsersInputTopic(),
                                applicationProperties.getRankingInputTopic(),
+                               storeSupplier,
                                mapper);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
@@ -108,4 +113,10 @@ public class QueryApplicationConfiguration
 
                return streamProcessor;
        }
+
+       @Bean
+       public KeyValueBytesStoreSupplier storeSupplier()
+       {
+               return Stores.persistentKeyValueStore(STORE_NAME);
+       }
 }