query: 2.0.0 - Values are serialized as JSON
authorKai Moritz <kai@juplo.de>
Tue, 11 Jun 2024 18:36:58 +0000 (20:36 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 13 Jun 2024 15:23:44 +0000 (17:23 +0200)
--
works, but is very confusing

* The default-type is specified as a consumption-parameter in the command,
  that reads the input topic into the `KTable` via ``Consumed.with(..)``.
* The resulting code is confusing, because the ``Consumed``-parameter is
  used for both, the consumption of the input topic _and_ the consumption
  of stored values, if read from the state-store.
* Because of this, one might only think of the consumption of the stored
  values from the state-store, when looking at the ``Consumed.with()``-
  statement, and argue, why the type-mappings have to be specified here.

src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java

index 7da1712..07b78e4 100644 (file)
@@ -80,7 +80,6 @@ public class QueryApplicationConfiguration
 
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this!
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
                                "user:" + Key.class.getName() + "," +
index cc65fce..0692652 100644 (file)
@@ -13,9 +13,11 @@ import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.net.URI;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -53,7 +55,18 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, User> users = builder.table(usersInputTopic);
+               JsonSerde valueSerde = new JsonSerde();
+               valueSerde.configure(Map.of(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "user:" + Key.class.getName() + "," +
+                                               "ranking:" + Ranking.class.getName() + "," +
+                                               "userdata:" + User.class.getName() + "," +
+                                               "userranking:" + UserRanking.class.getName()
+               ), false);
+               KTable<String, User> users = builder.table(
+                               usersInputTopic,
+                               Consumed.with(null, valueSerde.copyWithType(User.class))
+                               );
                KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
 
                rankings