query: 2.0.0 - Defined 2 state-stores (all state in-memory in tests)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index 7dacd4b..4749264 100644 (file)
@@ -22,7 +22,8 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
-       public static final String STORE_NAME = "rankings-by-username";
+       public static final String USER_STORE_NAME = "users";
+       public static final String RANKING_STORE_NAME = "rankings";
 
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
@@ -34,27 +35,34 @@ public class QueryStreamProcessor
                        HostInfo applicationServer,
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                Topology topology = buildTopology(
                                usersInputTopic,
                                rankingInputTopic,
-                               storeSupplier);
+                               userStoreSupplier,
+                               rankingStoreSupplier);
                streams = new KafkaStreams(topology, props);
                hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
+               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
        }
 
        static Topology buildTopology(
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                KTable<String, User> users = builder
                                .stream(usersInputTopic)
-                               .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class)));
+                               .toTable(
+                                               Materialized
+                                                               .<String, User>as(userStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
+                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
 
                rankings
@@ -64,7 +72,7 @@ public class QueryStreamProcessor
                                                ranking.getEntries()))
                                .toTable(
                                                Materialized
-                                                               .<String, UserRanking>as(storeSupplier)
+                                                               .<String, UserRanking>as(rankingStoreSupplier)
                                                                .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
 
                Topology topology = builder.build();
@@ -80,7 +88,7 @@ public class QueryStreamProcessor
 
        public Optional<URI> getRedirect(String username)
        {
-               KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
                HostInfo activeHost = metadata.activeHost();
                log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());