query: 1.0.3 - application.server is derived from the local address
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplication.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.streams.state.HostInfo;
5 import org.springframework.boot.SpringApplication;
6 import org.springframework.boot.autoconfigure.SpringBootApplication;
7 import org.springframework.boot.autoconfigure.web.ServerProperties;
8 import org.springframework.boot.context.properties.EnableConfigurationProperties;
9 import org.springframework.context.ConfigurableApplicationContext;
10 import org.springframework.context.annotation.Bean;
11
12 import java.io.IOException;
13 import java.net.InetSocketAddress;
14 import java.net.Socket;
15
16
17 @SpringBootApplication
18 @EnableConfigurationProperties(QueryApplicationProperties.class)
19 public class QueryApplication
20 {
21         @Bean
22         public QueryStreamProcessor usersStreamProcessor(
23                         ServerProperties serverProperties,
24                         QueryApplicationProperties properties,
25                         ObjectMapper mapper,
26                         ConfigurableApplicationContext context) throws IOException
27         {
28                 String host;
29                 if (serverProperties.getAddress() == null)
30                 {
31                         HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
32                         Socket socket = new Socket();
33                         socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
34                         host = socket.getLocalAddress().getHostAddress();
35                 }
36                 else
37                 {
38                         host = serverProperties.getAddress().getHostAddress();
39                 }
40
41                 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
42
43                 return new QueryStreamProcessor(
44                                 properties.getApplicationId(),
45                                 new HostInfo(host, port),
46                                 properties.getBootstrapServer(),
47                                 properties.getUsersInputTopic(),
48                                 properties.getRankingInputTopic(),
49                                 mapper,
50                                 context);
51         }
52
53
54         public static void main(String[] args)
55         {
56                 SpringApplication.run(QueryApplication.class, args);
57         }
58 }