+ @Bean
+ public QueryStreamProcessor usersStreamProcessor(
+ ServerProperties serverProperties,
+ QueryApplicationProperties properties,
+ ObjectMapper mapper,
+ ConfigurableApplicationContext context) throws IOException
+ {
+ String host;
+ if (serverProperties.getAddress() == null)
+ {
+ HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+ Socket socket = new Socket();
+ socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+ host = socket.getLocalAddress().getHostAddress();
+ }
+ else
+ {
+ host = serverProperties.getAddress().getHostAddress();
+ }
+
+ Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+ return new QueryStreamProcessor(
+ properties.getApplicationId(),
+ new HostInfo(host, port),
+ properties.getBootstrapServer(),
+ properties.getUsersInputTopic(),
+ properties.getRankingInputTopic(),
+ mapper,
+ context);
+ }
+
+