3f866f01090491547b22f7f6b0f229afc5928004
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.streams.state.HostInfo;
5 import org.springframework.boot.autoconfigure.web.ServerProperties;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.ConfigurableApplicationContext;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 import java.io.IOException;
12 import java.net.InetSocketAddress;
13 import java.net.Socket;
14
15
16 @Configuration
17 @EnableConfigurationProperties(QueryApplicationProperties.class)
18 public class QueryApplicationConfiguration
19 {
20         @Bean
21         public HostInfo applicationServer(
22                         ServerProperties serverProperties,
23                         QueryApplicationProperties applicationProperties) throws IOException
24         {
25                 String host;
26                 if (serverProperties.getAddress() == null)
27                 {
28                         HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
29                         Socket socket = new Socket();
30                         socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
31                         host = socket.getLocalAddress().getHostAddress();
32                 }
33                 else
34                 {
35                         host = serverProperties.getAddress().getHostAddress();
36                 }
37
38                 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
39
40                 return new HostInfo(host, port);
41         }
42
43         @Bean
44         public QueryStreamProcessor streamProcessor(
45                         QueryApplicationProperties applicationProperties,
46                         HostInfo applicationServer,
47                         ObjectMapper mapper,
48                         ConfigurableApplicationContext context)
49         {
50                 return new QueryStreamProcessor(
51                                 applicationProperties.getApplicationId(),
52                                 applicationServer,
53                                 applicationProperties.getBootstrapServer(),
54                                 applicationProperties.getUsersInputTopic(),
55                                 applicationProperties.getRankingInputTopic(),
56                                 mapper,
57                                 context);
58         }
59 }