--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.streams.state.HostInfo;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(QueryApplicationProperties.class)
+public class QueryApplication
+{
+ @Bean
+ public QueryStreamProcessor usersStreamProcessor(
+ ServerProperties serverProperties,
+ QueryApplicationProperties properties,
+ ObjectMapper mapper,
+ ConfigurableApplicationContext context) throws IOException
+ {
+ String host;
+ if (serverProperties.getAddress() == null)
+ {
+ HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+ Socket socket = new Socket();
+ socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+ host = socket.getLocalAddress().getHostAddress();
+ }
+ else
+ {
+ host = serverProperties.getAddress().getHostAddress();
+ }
+
+ Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+ return new QueryStreamProcessor(
+ properties.getApplicationId(),
+ new HostInfo(host, port),
+ properties.getBootstrapServer(),
+ properties.getUsersInputTopic(),
+ properties.getRankingInputTopic(),
+ mapper,
+ context);
+ }
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(QueryApplication.class, args);
+ }
+}