From 807393b1cebc93250d596dd96b3ee2a582993a53 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 10:15:10 +0200 Subject: [PATCH] query: 1.0.6 - Refined `QueryAppilcationConfiguration` -- moved the creation of the streams-config into the config-class --- .../query/QueryApplicationConfiguration.java | 33 ++++++++++++++++--- .../wordcount/query/QueryStreamProcessor.java | 17 +++------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 88b14f0..a0ae095 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -1,6 +1,9 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -11,6 +14,7 @@ import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Properties; @Configuration @@ -40,16 +44,37 @@ public class QueryApplicationConfiguration return new HostInfo(host, port); } - public QueryStreamProcessor usersStreamProcessor( + @Bean + Properties streamProcessorProperties( QueryApplicationProperties applicationProperties, + HostInfo applicationServer) + { + Properties props = new Properties(); + + String applicationId = applicationProperties.getApplicationId(); + String bootstrapServer = applicationProperties.getBootstrapServer(); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + @Bean + public QueryStreamProcessor usersStreamProcessor( + Properties streamProcessorProperties, HostInfo applicationServer, + QueryApplicationProperties applicationProperties, ObjectMapper mapper, - ConfigurableApplicationContext context) throws IOException + ConfigurableApplicationContext context) { return new QueryStreamProcessor( - applicationProperties.getApplicationId(), + streamProcessorProperties, applicationServer, - applicationProperties.getBootstrapServer(), applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), mapper, diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index fed75b3..886c8cf 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,9 +5,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.*; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyQueryMetadata; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -36,9 +38,8 @@ public class QueryStreamProcessor public QueryStreamProcessor( - String applicationId, + Properties props, HostInfo applicationServer, - String bootstrapServer, String usersInputTopic, String rankingInputTopic, ObjectMapper mapper, @@ -70,14 +71,6 @@ public class QueryStreamProcessor }) .toTable(Materialized.as(storeName)); - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streams = new KafkaStreams(builder.build(), props); streams.setUncaughtExceptionHandler((Throwable e) -> { -- 2.20.1