From 8b1fd41f91be906ac0c691b4df7c528cad9ff583 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 16 Oct 2021 16:34:54 +0200 Subject: [PATCH] query: 1.0.3 - application.server is derived from the local address --- pom.xml | 2 +- .../wordcount/query/QueryApplication.java | 42 +++++++++++++++++++ .../query/QueryApplicationProperties.java | 1 - .../wordcount/query/QueryStreamProcessor.java | 20 +++++---- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index 56a2b76..8164674 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 1.0.2 + 1.0.3 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java index 995c1a1..813d3b2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java @@ -1,14 +1,56 @@ package de.juplo.kafka.wordcount.query; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.streams.state.HostInfo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; @SpringBootApplication @EnableConfigurationProperties(QueryApplicationProperties.class) public class QueryApplication { + @Bean + public QueryStreamProcessor usersStreamProcessor( + ServerProperties serverProperties, + QueryApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new QueryStreamProcessor( + properties.getApplicationId(), + new HostInfo(host, port), + properties.getBootstrapServer(), + properties.getUsersInputTopic(), + properties.getRankingInputTopic(), + mapper, + context); + } + + public static void main(String[] args) { SpringApplication.run(QueryApplication.class, args); diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java index 19265f1..df5f41e 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -17,5 +17,4 @@ public class QueryApplicationProperties private String applicationId = "query"; private String rankingInputTopic = "top10"; private String usersInputTopic = "users"; - private String applicationServer = "localhost:8080"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 696e088..7e7af21 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -14,7 +14,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -27,7 +26,6 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j -@Component public class QueryStreamProcessor { public final KafkaStreams streams; @@ -38,14 +36,18 @@ public class QueryStreamProcessor public QueryStreamProcessor( - QueryApplicationProperties properties, + String applicationId, + HostInfo applicationServer, + String bootstrapServer, + String usersInputTopic, + String rankingInputTopic, ObjectMapper mapper, ConfigurableApplicationContext context) { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(properties.getUsersInputTopic()); - KStream rankings = builder.stream(properties.getRankingInputTopic()); + KTable users = builder.table(usersInputTopic); + KStream rankings = builder.stream(rankingInputTopic); rankings .join(users, (rankingJson, userJson) -> @@ -69,9 +71,9 @@ public class QueryStreamProcessor .toTable(Materialized.as(storeName)); Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -88,7 +90,7 @@ public class QueryStreamProcessor return SHUTDOWN_CLIENT; }); - hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer()); + hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());; this.mapper = mapper; } -- 2.20.1