From 030c0db664cea0ec3e4307065054c449dbca6be2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 12:41:01 +0200 Subject: [PATCH] query: 1.0.6 - Refined `QueryApplicationConfiguration` -- the state-store is configurable through a store-suplier --- .../query/QueryApplicationConfiguration.java | 11 +++++++++++ .../kafka/wordcount/query/QueryStreamProcessor.java | 11 +++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index b546b8c..50a5364 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -6,6 +6,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -19,6 +21,7 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -85,6 +88,7 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper, ConfigurableApplicationContext context) { @@ -93,6 +97,7 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), + storeSupplier, mapper); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> @@ -108,4 +113,10 @@ public class QueryApplicationConfiguration return streamProcessor; } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore(STORE_NAME); + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index efa28ca..3e205f6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -11,6 +11,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -35,9 +36,14 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { - Topology topology = buildTopology(usersInputTopic, rankingInputTopic, mapper); + Topology topology = buildTopology( + usersInputTopic, + rankingInputTopic, + storeSupplier, + mapper); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; @@ -47,6 +53,7 @@ public class QueryStreamProcessor static Topology buildTopology( String usersInputTopic, String rankingInputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -73,7 +80,7 @@ public class QueryStreamProcessor throw new RuntimeException(e); } }) - .toTable(Materialized.as(STORE_NAME)); + .toTable(Materialized.as(storeSupplier)); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); -- 2.20.1