query: 1.0.6 - Refined `QueryApplicationConfiguration`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
index 813d3b2..0e30e74 100644 (file)
@@ -1,34 +1,35 @@
 package de.juplo.kafka.wordcount.query;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Properties;
 
 
-@SpringBootApplication
+@Configuration
 @EnableConfigurationProperties(QueryApplicationProperties.class)
-public class QueryApplication
+public class QueryApplicationConfiguration
 {
        @Bean
-       public QueryStreamProcessor usersStreamProcessor(
+       public HostInfo applicationServer(
                        ServerProperties serverProperties,
-                       QueryApplicationProperties properties,
-                       ObjectMapper mapper,
-                       ConfigurableApplicationContext context) throws IOException
+                       QueryApplicationProperties applicationProperties) throws IOException
        {
                String host;
                if (serverProperties.getAddress() == null)
                {
-                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
                        Socket socket = new Socket();
                        socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
                        host = socket.getLocalAddress().getHostAddress();
@@ -40,19 +41,53 @@ public class QueryApplication
 
                Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
 
-               return new QueryStreamProcessor(
-                               properties.getApplicationId(),
-                               new HostInfo(host, port),
-                               properties.getBootstrapServer(),
-                               properties.getUsersInputTopic(),
-                               properties.getRankingInputTopic(),
-                               mapper,
-                               context);
+               return new HostInfo(host, port);
        }
 
+       @Bean
+       public Properties streamProcessorProperties(
+                       QueryApplicationProperties applicationProperties,
+                       HostInfo applicationServer)
+       {
+               Properties props = new Properties();
+
+               props.putAll(serializationConfig());
+
+               String applicationId = applicationProperties.getApplicationId();
+               String bootstrapServer = applicationProperties.getBootstrapServer();
+
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return props;
+       }
 
-       public static void main(String[] args)
+       static Properties serializationConfig()
        {
-               SpringApplication.run(QueryApplication.class, args);
+               Properties props = new Properties();
+
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+               return props;
+       }
+
+       @Bean
+       public QueryStreamProcessor streamProcessor(
+                       Properties streamProcessorProperties,
+                       HostInfo applicationServer,
+                       QueryApplicationProperties applicationProperties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context)
+       {
+               return new QueryStreamProcessor(
+                               streamProcessorProperties,
+                               applicationServer,
+                               applicationProperties.getUsersInputTopic(),
+                               applicationProperties.getRankingInputTopic(),
+                               mapper,
+                               context);
        }
 }