1 package de.juplo.kafka.wordcount.query;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.streams.state.HostInfo;
5 import org.springframework.boot.SpringApplication;
6 import org.springframework.boot.autoconfigure.SpringBootApplication;
7 import org.springframework.boot.autoconfigure.web.ServerProperties;
8 import org.springframework.boot.context.properties.EnableConfigurationProperties;
9 import org.springframework.context.ConfigurableApplicationContext;
10 import org.springframework.context.annotation.Bean;
12 import java.io.IOException;
13 import java.net.InetSocketAddress;
14 import java.net.Socket;
17 @SpringBootApplication
18 @EnableConfigurationProperties(QueryApplicationProperties.class)
19 public class QueryApplication
22 public QueryStreamProcessor usersStreamProcessor(
23 ServerProperties serverProperties,
24 QueryApplicationProperties properties,
26 ConfigurableApplicationContext context) throws IOException
29 if (serverProperties.getAddress() == null)
31 HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
32 Socket socket = new Socket();
33 socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
34 host = socket.getLocalAddress().getHostAddress();
38 host = serverProperties.getAddress().getHostAddress();
41 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
43 return new QueryStreamProcessor(
44 properties.getApplicationId(),
45 new HostInfo(host, port),
46 properties.getBootstrapServer(),
47 properties.getUsersInputTopic(),
48 properties.getRankingInputTopic(),
54 public static void main(String[] args)
56 SpringApplication.run(QueryApplication.class, args);