ae93d457a61b4b23cca64e61bc09d7d52cd84c4c
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.consumer.ConsumerConfig;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.StreamsConfig;
7 import org.apache.kafka.streams.state.HostInfo;
8 import org.springframework.boot.autoconfigure.web.ServerProperties;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.ConfigurableApplicationContext;
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.context.annotation.Configuration;
13
14 import java.io.IOException;
15 import java.net.InetSocketAddress;
16 import java.net.Socket;
17 import java.util.Properties;
18
19
20 @Configuration
21 @EnableConfigurationProperties(QueryApplicationProperties.class)
22 public class QueryApplicationConfiguration
23 {
24         @Bean
25         public HostInfo applicationServer(
26                         ServerProperties serverProperties,
27                         QueryApplicationProperties applicationProperties) throws IOException
28         {
29                 String host;
30                 if (serverProperties.getAddress() == null)
31                 {
32                         HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
33                         Socket socket = new Socket();
34                         socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
35                         host = socket.getLocalAddress().getHostAddress();
36                 }
37                 else
38                 {
39                         host = serverProperties.getAddress().getHostAddress();
40                 }
41
42                 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
43
44                 return new HostInfo(host, port);
45         }
46
47         @Bean
48         public Properties streamProcessorProperties(
49                         QueryApplicationProperties applicationProperties,
50                         HostInfo applicationServer)
51         {
52                 Properties props = new Properties();
53
54                 String applicationId = applicationProperties.getApplicationId();
55                 String bootstrapServer = applicationProperties.getBootstrapServer();
56
57                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
58                 props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
59                 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
60                 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
61                 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
62                 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
63
64                 return props;
65         }
66
67         @Bean
68         public QueryStreamProcessor streamProcessor(
69                         Properties streamProcessorProperties,
70                         HostInfo applicationServer,
71                         QueryApplicationProperties applicationProperties,
72                         ObjectMapper mapper,
73                         ConfigurableApplicationContext context)
74         {
75                 return new QueryStreamProcessor(
76                                 streamProcessorProperties,
77                                 applicationServer,
78                                 applicationProperties.getUsersInputTopic(),
79                                 applicationProperties.getRankingInputTopic(),
80                                 mapper,
81                                 context);
82         }
83 }