query: 1.0.6 - Refined `QueryAppilcationConfiguration`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
index 3f866f0..ae93d45 100644 (file)
@@ -1,6 +1,9 @@
 package de.juplo.kafka.wordcount.query;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -11,6 +14,7 @@ import org.springframework.context.annotation.Configuration;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Properties;
 
 
 @Configuration
@@ -41,16 +45,36 @@ public class QueryApplicationConfiguration
        }
 
        @Bean
-       public QueryStreamProcessor streamProcessor(
+       public Properties streamProcessorProperties(
                        QueryApplicationProperties applicationProperties,
+                       HostInfo applicationServer)
+       {
+               Properties props = new Properties();
+
+               String applicationId = applicationProperties.getApplicationId();
+               String bootstrapServer = applicationProperties.getBootstrapServer();
+
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return props;
+       }
+
+       @Bean
+       public QueryStreamProcessor streamProcessor(
+                       Properties streamProcessorProperties,
                        HostInfo applicationServer,
+                       QueryApplicationProperties applicationProperties,
                        ObjectMapper mapper,
                        ConfigurableApplicationContext context)
        {
                return new QueryStreamProcessor(
-                               applicationProperties.getApplicationId(),
+                               streamProcessorProperties,
                                applicationServer,
-                               applicationProperties.getBootstrapServer(),
                                applicationProperties.getUsersInputTopic(),
                                applicationProperties.getRankingInputTopic(),
                                mapper,