query: 2.0.1 - (GREEN) Adjusted implementation to corrected expectations
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index 0692652..e075eb7 100644 (file)
@@ -5,19 +5,14 @@ import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.net.URI;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -25,7 +20,8 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
-       public static final String STORE_NAME = "rankings-by-username";
+       public static final String USER_STORE_NAME = "users";
+       public static final String RANKING_STORE_NAME = "rankings";
 
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
@@ -37,46 +33,50 @@ public class QueryStreamProcessor
                        HostInfo applicationServer,
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                Topology topology = buildTopology(
                                usersInputTopic,
                                rankingInputTopic,
-                               storeSupplier);
+                               userStoreSupplier,
+                               rankingStoreSupplier);
                streams = new KafkaStreams(topology, props);
                hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
+               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
        }
 
        static Topology buildTopology(
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               JsonSerde valueSerde = new JsonSerde();
-               valueSerde.configure(Map.of(
-                               JsonDeserializer.TYPE_MAPPINGS,
-                               "user:" + Key.class.getName() + "," +
-                                               "ranking:" + Ranking.class.getName() + "," +
-                                               "userdata:" + User.class.getName() + "," +
-                                               "userranking:" + UserRanking.class.getName()
-               ), false);
-               KTable<String, User> users = builder.table(
-                               usersInputTopic,
-                               Consumed.with(null, valueSerde.copyWithType(User.class))
-                               );
-               KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+               KTable<String, User> users = builder
+                               .stream(
+                                               usersInputTopic,
+                                               Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
+                               .toTable(
+                                               Materialized
+                                                               .<String, User>as(userStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
+                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
+               KStream<String, Ranking> rankings = builder
+                               .<Key, Ranking>stream(rankingInputTopic)
+                               .map((key, value) -> new KeyValue<>(key.getUser(), value));
 
                rankings
                                .join(users, (ranking, user) -> UserRanking.of(
                                                user.getFirstName(),
                                                user.getLastName(),
-                                               ranking.getEntries()))
+                                               ranking.getEntries()),
+                                               Joined.keySerde(Serdes.String()))
                                .toTable(
                                                Materialized
-                                                               .<String, UserRanking>as(storeSupplier)
+                                                               .<String, UserRanking>as(rankingStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
                                                                .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
 
                Topology topology = builder.build();
@@ -92,7 +92,7 @@ public class QueryStreamProcessor
 
        public Optional<URI> getRedirect(String username)
        {
-               KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
                HostInfo activeHost = metadata.activeHost();
                log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());