query: 1.0.6 - Separated config in `QueryApplicationConfiguration` -- COPY
authorKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 07:28:51 +0000 (09:28 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 10:31:34 +0000 (12:31 +0200)
src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
new file mode 100644 (file)
index 0000000..813d3b2
--- /dev/null
@@ -0,0 +1,58 @@
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.streams.state.HostInfo;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(QueryApplicationProperties.class)
+public class QueryApplication
+{
+       @Bean
+       public QueryStreamProcessor usersStreamProcessor(
+                       ServerProperties serverProperties,
+                       QueryApplicationProperties properties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context) throws IOException
+       {
+               String host;
+               if (serverProperties.getAddress() == null)
+               {
+                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+                       Socket socket = new Socket();
+                       socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+                       host = socket.getLocalAddress().getHostAddress();
+               }
+               else
+               {
+                       host = serverProperties.getAddress().getHostAddress();
+               }
+
+               Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+               return new QueryStreamProcessor(
+                               properties.getApplicationId(),
+                               new HostInfo(host, port),
+                               properties.getBootstrapServer(),
+                               properties.getUsersInputTopic(),
+                               properties.getRankingInputTopic(),
+                               mapper,
+                               context);
+       }
+
+
+       public static void main(String[] args)
+       {
+               SpringApplication.run(QueryApplication.class, args);
+       }
+}