X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersApplication.java;h=17cad12e801bde733f68ec6785b030a6babdf607;hb=4e2fc001e9d7041b875e4a5e62eff250b1a8ce60;hp=9ae44e1fdefde706b2bc4d6519af00842174cc69;hpb=aaf3b3248937ffd1677a1eeb6a575c539b6ec84a;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java index 9ae44e1..17cad12 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -1,14 +1,21 @@ package de.juplo.kafka.wordcount.users; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.state.HostInfo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.util.Assert; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; import java.util.Properties; @@ -29,6 +36,38 @@ public class UsersApplication return new KafkaProducer<>(props); } + @Bean + public UsersStreamProcessor usersStreamProcessor( + ServerProperties serverProperties, + UsersApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new UsersStreamProcessor( + properties.getApplicationId(), + new HostInfo(host, port), + properties.getBootstrapServer(), + properties.getTopic(), + mapper, + context); + } + + public static void main(String[] args) { SpringApplication.run(UsersApplication.class, args);