users:1.0.3 - application.server is derived from the local address
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersApplication.java
index 9ae44e1..17cad12 100644 (file)
@@ -1,14 +1,21 @@
 package de.juplo.kafka.wordcount.users;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.state.HostInfo;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.util.Assert;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.util.Properties;
 
 
@@ -29,6 +36,38 @@ public class UsersApplication
                return new KafkaProducer<>(props);
        }
 
+       @Bean
+       public UsersStreamProcessor usersStreamProcessor(
+                       ServerProperties serverProperties,
+                       UsersApplicationProperties properties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context) throws IOException
+       {
+               String host;
+               if (serverProperties.getAddress() == null)
+               {
+                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+                       Socket socket = new Socket();
+                       socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+                       host = socket.getLocalAddress().getHostAddress();
+               }
+               else
+               {
+                       host = serverProperties.getAddress().getHostAddress();
+               }
+
+               Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+               return new UsersStreamProcessor(
+                               properties.getApplicationId(),
+                               new HostInfo(host, port),
+                               properties.getBootstrapServer(),
+                               properties.getTopic(),
+                               mapper,
+                               context);
+       }
+
+
        public static void main(String[] args)
        {
                SpringApplication.run(UsersApplication.class, args);