query: 2.0.0 - Values are serialized as JSON
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index cc65fce..0692652 100644 (file)
@@ -13,9 +13,11 @@ import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.net.URI;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -53,7 +55,18 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, User> users = builder.table(usersInputTopic);
+               JsonSerde valueSerde = new JsonSerde();
+               valueSerde.configure(Map.of(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "user:" + Key.class.getName() + "," +
+                                               "ranking:" + Ranking.class.getName() + "," +
+                                               "userdata:" + User.class.getName() + "," +
+                                               "userranking:" + UserRanking.class.getName()
+               ), false);
+               KTable<String, User> users = builder.table(
+                               usersInputTopic,
+                               Consumed.with(null, valueSerde.copyWithType(User.class))
+                               );
                KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
 
                rankings