1 package de.juplo.kafka.wordcount.query;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.streams.state.HostInfo;
5 import org.springframework.boot.autoconfigure.web.ServerProperties;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.ConfigurableApplicationContext;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
11 import java.io.IOException;
12 import java.net.InetSocketAddress;
13 import java.net.Socket;
17 @EnableConfigurationProperties(QueryApplicationProperties.class)
18 public class QueryApplicationConfiguration
21 public HostInfo applicationServer(
22 ServerProperties serverProperties,
23 QueryApplicationProperties applicationProperties) throws IOException
26 if (serverProperties.getAddress() == null)
28 HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
29 Socket socket = new Socket();
30 socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
31 host = socket.getLocalAddress().getHostAddress();
35 host = serverProperties.getAddress().getHostAddress();
38 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
40 return new HostInfo(host, port);
44 public QueryStreamProcessor streamProcessor(
45 QueryApplicationProperties applicationProperties,
46 HostInfo applicationServer,
48 ConfigurableApplicationContext context)
50 return new QueryStreamProcessor(
51 applicationProperties.getApplicationId(),
53 applicationProperties.getBootstrapServer(),
54 applicationProperties.getUsersInputTopic(),
55 applicationProperties.getRankingInputTopic(),