9f6769f3d00d63125b59a57e4036535f2af3ef65
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.streams.state.HostInfo;
5 import org.springframework.boot.autoconfigure.web.ServerProperties;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.ConfigurableApplicationContext;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 import java.io.IOException;
12 import java.net.InetSocketAddress;
13 import java.net.Socket;
14
15
16 @Configuration
17 @EnableConfigurationProperties(QueryApplicationProperties.class)
18 public class QueryApplicationConfiguration
19 {
20         @Bean
21         public QueryStreamProcessor streamProcessor(
22                         ServerProperties serverProperties,
23                         QueryApplicationProperties applicationProperties,
24                         ObjectMapper mapper,
25                         ConfigurableApplicationContext context) throws IOException
26         {
27                 String host;
28                 if (serverProperties.getAddress() == null)
29                 {
30                         HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
31                         Socket socket = new Socket();
32                         socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
33                         host = socket.getLocalAddress().getHostAddress();
34                 }
35                 else
36                 {
37                         host = serverProperties.getAddress().getHostAddress();
38                 }
39
40                 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
41
42                 return new QueryStreamProcessor(
43                                 applicationProperties.getApplicationId(),
44                                 new HostInfo(host, port),
45                                 applicationProperties.getBootstrapServer(),
46                                 applicationProperties.getUsersInputTopic(),
47                                 applicationProperties.getRankingInputTopic(),
48                                 mapper,
49                                 context);
50         }
51 }