users:1.0.3 - application.server is derived from the local address
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersApplication.java
1 package de.juplo.kafka.wordcount.users;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerConfig;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.apache.kafka.streams.state.HostInfo;
8 import org.springframework.boot.SpringApplication;
9 import org.springframework.boot.autoconfigure.SpringBootApplication;
10 import org.springframework.boot.autoconfigure.web.ServerProperties;
11 import org.springframework.boot.context.properties.EnableConfigurationProperties;
12 import org.springframework.context.ConfigurableApplicationContext;
13 import org.springframework.context.annotation.Bean;
14 import org.springframework.util.Assert;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.util.Properties;
20
21
22 @SpringBootApplication
23 @EnableConfigurationProperties(UsersApplicationProperties.class)
24 public class UsersApplication
25 {
26         @Bean(destroyMethod = "close")
27         KafkaProducer<String, String> producer(UsersApplicationProperties properties)
28         {
29                 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
30
31                 Properties props = new Properties();
32                 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
33                 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
34                 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
35
36                 return new KafkaProducer<>(props);
37         }
38
39         @Bean
40         public UsersStreamProcessor usersStreamProcessor(
41                         ServerProperties serverProperties,
42                         UsersApplicationProperties properties,
43                         ObjectMapper mapper,
44                         ConfigurableApplicationContext context) throws IOException
45         {
46                 String host;
47                 if (serverProperties.getAddress() == null)
48                 {
49                         HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
50                         Socket socket = new Socket();
51                         socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
52                         host = socket.getLocalAddress().getHostAddress();
53                 }
54                 else
55                 {
56                         host = serverProperties.getAddress().getHostAddress();
57                 }
58
59                 Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
60
61                 return new UsersStreamProcessor(
62                                 properties.getApplicationId(),
63                                 new HostInfo(host, port),
64                                 properties.getBootstrapServer(),
65                                 properties.getTopic(),
66                                 mapper,
67                                 context);
68         }
69
70
71         public static void main(String[] args)
72         {
73                 SpringApplication.run(UsersApplication.class, args);
74         }
75 }