From 4e2fc001e9d7041b875e4a5e62eff250b1a8ce60 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 16 Oct 2021 15:13:16 +0200 Subject: [PATCH] users:1.0.3 - application.server is derived from the local address --- pom.xml | 2 +- .../wordcount/users/UsersApplication.java | 39 +++++++++++++++++++ .../users/UsersApplicationProperties.java | 1 - .../wordcount/users/UsersStreamProcessor.java | 17 ++++---- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index e46706e..c98538a 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount users - 1.0.2 + 1.0.3 Wordcount-Users Users-service of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java index 9ae44e1..17cad12 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -1,14 +1,21 @@ package de.juplo.kafka.wordcount.users; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.state.HostInfo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.util.Assert; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; import java.util.Properties; @@ -29,6 +36,38 @@ public class UsersApplication return new KafkaProducer<>(props); } + @Bean + public UsersStreamProcessor usersStreamProcessor( + ServerProperties serverProperties, + UsersApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new UsersStreamProcessor( + properties.getApplicationId(), + new HostInfo(host, port), + properties.getBootstrapServer(), + properties.getTopic(), + mapper, + context); + } + + public static void main(String[] args) { SpringApplication.run(UsersApplication.class, args); diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java index 2c50208..cf88a95 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java @@ -16,5 +16,4 @@ public class UsersApplicationProperties private String bootstrapServer = "localhost:9092"; private String applicationId = "users"; private String topic = "users"; - private String applicationServer = "localhost:8080"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java index ef39e89..1c7572e 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java @@ -12,7 +12,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -25,7 +24,6 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j -@Component public class UsersStreamProcessor { public final KafkaStreams streams; @@ -36,17 +34,20 @@ public class UsersStreamProcessor public UsersStreamProcessor( - UsersApplicationProperties properties, + String applicationId, + HostInfo applicationServer, + String bootstrapServer, + String topic, ObjectMapper mapper, ConfigurableApplicationContext context) { StreamsBuilder builder = new StreamsBuilder(); - builder.table(properties.getTopic(), Materialized.as(storeName)); + builder.table(topic, Materialized.as(storeName)); Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -63,7 +64,7 @@ public class UsersStreamProcessor return SHUTDOWN_CLIENT; }); - hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer()); + hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());; this.mapper = mapper; } -- 2.20.1