X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;h=ff7c1501c0a7cae4e2373d729bf240f18677ff47;hb=57f47fca712981116b726e437f589ac727c5d0a7;hp=efa28ca69fd18b5f7248b2c4f1a09b7c3304a455;hpb=3668a70b197b8c9f62ff735e3df031e6146bc370;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index efa28ca..ff7c150 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -11,6 +11,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -35,9 +36,14 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { - Topology topology = buildTopology(usersInputTopic, rankingInputTopic, mapper); + Topology topology = buildTopology( + usersInputTopic, + rankingInputTopic, + storeSupplier, + mapper); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; @@ -47,6 +53,7 @@ public class QueryStreamProcessor static Topology buildTopology( String usersInputTopic, String rankingInputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -73,7 +80,7 @@ public class QueryStreamProcessor throw new RuntimeException(e); } }) - .toTable(Materialized.as(STORE_NAME)); + .toTable(Materialized.as(storeSupplier)); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); @@ -81,6 +88,11 @@ public class QueryStreamProcessor return topology; } + ReadOnlyKeyValueStore getStore() + { + return streams.store(storeParameters); + } + public Optional getRedirect(String username) { KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer()); @@ -101,7 +113,7 @@ public class QueryStreamProcessor { return Optional - .ofNullable(streams.store(storeParameters).get(username)) + .ofNullable(getStore().get(username)) .map(json -> { try