X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;h=3e205f67beb2d1f331b9a9bde68f7118fd3b88ba;hb=030c0db664cea0ec3e4307065054c449dbca6be2;hp=efa28ca69fd18b5f7248b2c4f1a09b7c3304a455;hpb=3668a70b197b8c9f62ff735e3df031e6146bc370;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index efa28ca..3e205f6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -11,6 +11,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -35,9 +36,14 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { - Topology topology = buildTopology(usersInputTopic, rankingInputTopic, mapper); + Topology topology = buildTopology( + usersInputTopic, + rankingInputTopic, + storeSupplier, + mapper); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; @@ -47,6 +53,7 @@ public class QueryStreamProcessor static Topology buildTopology( String usersInputTopic, String rankingInputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -73,7 +80,7 @@ public class QueryStreamProcessor throw new RuntimeException(e); } }) - .toTable(Materialized.as(STORE_NAME)); + .toTable(Materialized.as(storeSupplier)); Topology topology = builder.build(); log.info("\n\n{}", topology.describe());