query: 2.0.0 - (GREEN) Adjusted the implementation to the new expectations
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index 0692652..dcb1234 100644 (file)
@@ -13,11 +13,9 @@ import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.net.URI;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -25,7 +23,8 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
-       public static final String STORE_NAME = "rankings-by-username";
+       public static final String USER_STORE_NAME = "users";
+       public static final String RANKING_STORE_NAME = "rankings";
 
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
@@ -37,36 +36,36 @@ public class QueryStreamProcessor
                        HostInfo applicationServer,
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                Topology topology = buildTopology(
                                usersInputTopic,
                                rankingInputTopic,
-                               storeSupplier);
+                               userStoreSupplier,
+                               rankingStoreSupplier);
                streams = new KafkaStreams(topology, props);
                hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
+               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
        }
 
        static Topology buildTopology(
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               JsonSerde valueSerde = new JsonSerde();
-               valueSerde.configure(Map.of(
-                               JsonDeserializer.TYPE_MAPPINGS,
-                               "user:" + Key.class.getName() + "," +
-                                               "ranking:" + Ranking.class.getName() + "," +
-                                               "userdata:" + User.class.getName() + "," +
-                                               "userranking:" + UserRanking.class.getName()
-               ), false);
-               KTable<String, User> users = builder.table(
-                               usersInputTopic,
-                               Consumed.with(null, valueSerde.copyWithType(User.class))
-                               );
+               KTable<String, User> users = builder
+                               .stream(
+                                               usersInputTopic,
+                                               Consumed.with(null, new JsonSerde().copyWithType(User.class)))
+                               .toTable(
+                                               Materialized
+                                                               .<String, User>as(userStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
+                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
 
                rankings
@@ -76,7 +75,7 @@ public class QueryStreamProcessor
                                                ranking.getEntries()))
                                .toTable(
                                                Materialized
-                                                               .<String, UserRanking>as(storeSupplier)
+                                                               .<String, UserRanking>as(rankingStoreSupplier)
                                                                .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
 
                Topology topology = builder.build();
@@ -92,7 +91,7 @@ public class QueryStreamProcessor
 
        public Optional<URI> getRedirect(String username)
        {
-               KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
                HostInfo activeHost = metadata.activeHost();
                log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());