query: 1.0.3 - application.server is derived from the local address
authorKai Moritz <kai@juplo.de>
Sat, 16 Oct 2021 14:34:54 +0000 (16:34 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 16 Oct 2021 14:34:54 +0000 (16:34 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java
src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java

diff --git a/pom.xml b/pom.xml
index 56a2b76..8164674 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>query</artifactId>
-       <version>1.0.2</version>
+       <version>1.0.3</version>
        <name>Wordcount-Query</name>
        <description>Query stream-processor of the multi-user wordcount-example</description>
        <properties>
index 995c1a1..813d3b2 100644 (file)
@@ -1,14 +1,56 @@
 package de.juplo.kafka.wordcount.query;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.streams.state.HostInfo;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 
 
 @SpringBootApplication
 @EnableConfigurationProperties(QueryApplicationProperties.class)
 public class QueryApplication
 {
+       @Bean
+       public QueryStreamProcessor usersStreamProcessor(
+                       ServerProperties serverProperties,
+                       QueryApplicationProperties properties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context) throws IOException
+       {
+               String host;
+               if (serverProperties.getAddress() == null)
+               {
+                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+                       Socket socket = new Socket();
+                       socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+                       host = socket.getLocalAddress().getHostAddress();
+               }
+               else
+               {
+                       host = serverProperties.getAddress().getHostAddress();
+               }
+
+               Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+               return new QueryStreamProcessor(
+                               properties.getApplicationId(),
+                               new HostInfo(host, port),
+                               properties.getBootstrapServer(),
+                               properties.getUsersInputTopic(),
+                               properties.getRankingInputTopic(),
+                               mapper,
+                               context);
+       }
+
+
        public static void main(String[] args)
        {
                SpringApplication.run(QueryApplication.class, args);
index 19265f1..df5f41e 100644 (file)
@@ -17,5 +17,4 @@ public class QueryApplicationProperties
   private String applicationId = "query";
   private String rankingInputTopic = "top10";
   private String usersInputTopic = "users";
-  private String applicationServer = "localhost:8080";
 }
index 696e088..7e7af21 100644 (file)
@@ -14,7 +14,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.boot.SpringApplication;
 import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -27,7 +26,6 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St
 
 
 @Slf4j
-@Component
 public class QueryStreamProcessor
 {
        public final KafkaStreams streams;
@@ -38,14 +36,18 @@ public class QueryStreamProcessor
 
 
        public QueryStreamProcessor(
-                       QueryApplicationProperties properties,
+                       String applicationId,
+                       HostInfo applicationServer,
+                       String bootstrapServer,
+                       String usersInputTopic,
+                       String rankingInputTopic,
                        ObjectMapper mapper,
                        ConfigurableApplicationContext context)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, String> users = builder.table(properties.getUsersInputTopic());
-               KStream<String, String> rankings = builder.stream(properties.getRankingInputTopic());
+               KTable<String, String> users = builder.table(usersInputTopic);
+               KStream<String, String> rankings = builder.stream(rankingInputTopic);
 
                rankings
                                .join(users, (rankingJson, userJson) ->
@@ -69,9 +71,9 @@ public class QueryStreamProcessor
                                .toTable(Materialized.as(storeName));
 
                Properties props = new Properties();
-               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
-               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
-               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -88,7 +90,7 @@ public class QueryStreamProcessor
                        return SHUTDOWN_CLIENT;
                });
 
-               hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer());
+               hostInfo = applicationServer;
                storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
                this.mapper = mapper;
        }