0e30e7490c6336e8ad4903de132f61982bae7979
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.consumer.ConsumerConfig;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.StreamsConfig;
7 import org.apache.kafka.streams.state.HostInfo;
8 import org.springframework.boot.autoconfigure.web.ServerProperties;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.ConfigurableApplicationContext;
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.context.annotation.Configuration;
13
14 import java.io.IOException;
15 import java.net.InetSocketAddress;
16 import java.net.Socket;
17 import java.util.Properties;
18
19
20 @Configuration
21 @EnableConfigurationProperties(QueryApplicationProperties.class)
22 public class QueryApplicationConfiguration
23 {
24         @Bean
25         public HostInfo applicationServer(
26                         ServerProperties serverProperties,
27                         QueryApplicationProperties applicationProperties) throws IOException
28         {
29                 String host;
30                 if (serverProperties.getAddress() == null)
31                 {
32                         HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
33                         Socket socket = new Socket();
34                         socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
35                         host = socket.getLocalAddress().getHostAddress();
36                 }
37                 else
38                 {
39                         host = serverProperties.getAddress().getHostAddress();
40                 }
41
42                 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
43
44                 return new HostInfo(host, port);
45         }
46
47         @Bean
48         public Properties streamProcessorProperties(
49                         QueryApplicationProperties applicationProperties,
50                         HostInfo applicationServer)
51         {
52                 Properties props = new Properties();
53
54                 props.putAll(serializationConfig());
55
56                 String applicationId = applicationProperties.getApplicationId();
57                 String bootstrapServer = applicationProperties.getBootstrapServer();
58
59                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
60                 props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
61                 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
62                 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
63
64                 return props;
65         }
66
67         static Properties serializationConfig()
68         {
69                 Properties props = new Properties();
70
71                 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
72                 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
73
74                 return props;
75         }
76
77         @Bean
78         public QueryStreamProcessor streamProcessor(
79                         Properties streamProcessorProperties,
80                         HostInfo applicationServer,
81                         QueryApplicationProperties applicationProperties,
82                         ObjectMapper mapper,
83                         ConfigurableApplicationContext context)
84         {
85                 return new QueryStreamProcessor(
86                                 streamProcessorProperties,
87                                 applicationServer,
88                                 applicationProperties.getUsersInputTopic(),
89                                 applicationProperties.getRankingInputTopic(),
90                                 mapper,
91                                 context);
92         }
93 }