query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON
authorKai Moritz <kai@juplo.de>
Fri, 14 Jun 2024 18:39:41 +0000 (20:39 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 19:33:50 +0000 (21:33 +0200)
* The seemingly straightforward change leds to a very strange and
  inconsisten error-situation.
* Only the integration-test fails, while the topology-test works as
  originally expected.
* The cause of the error is a missing serde-config for the key of the
  ``rankings``-``KStream``, which defies easy explanation.
* The best explanation is, that the ``map()``-operation - despite
  possibly changing the type of the key and/or value - does not by itself
  define a parameter for specifing a corresponding serialization-config.
* The reason for this is, that the operation does not define the complete
  operation by itself.
* In order to take effect, it has to be combined with a second operation,
  that actually creates the outgoing topic.
* Without that second DSL-operation, `map()` simply would yield no action.
* And that is, why the serialization has to be defined on that second
  operation and cannot be defined on `map()` itself.
* But the really strange thing about the error is, that it _only_ shows up
  in `QueryApplicationIT`.
* It does not show in `QueryStreamProcessorTopologyTest` _and_ it does
  _not_ show up, if the application is compiled and started in the
  docker-setup.
* One possible explanation for this wired behaviour might be a bug or
  misconception in the interpretation of the beforehand build topology,
  that leads to a non-deterministic behaviour.
* Another possible explanation might be subtle differences in the internal
  caching behaviour -- but that seems unlikely, because tests, that are
  based on the `TopologyTestDriver` do not cache and are very (on the oposit)
  very handy if one wants to reveal bugs concerning the serialization and
  and running the application with the caching settings from the IT does
  not show the error.

src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/query/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java [new file with mode: 0644]

index 6c7844d..440d5c4 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -86,7 +85,7 @@ public class QueryApplicationConfiguration
        {
                Properties props = new Properties();
 
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
index dcb1234..bf27e2d 100644 (file)
@@ -60,13 +60,15 @@ public class QueryStreamProcessor
                KTable<String, User> users = builder
                                .stream(
                                                usersInputTopic,
-                                               Consumed.with(null, new JsonSerde().copyWithType(User.class)))
+                                               Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
                                .toTable(
                                                Materialized
                                                                .<String, User>as(userStoreSupplier)
                                                                .withKeySerde(Serdes.String())
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
-               KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+               KStream<String, Ranking> rankings = builder
+                               .<Key, Ranking>stream(rankingInputTopic)
+                               .map((key, value) -> new KeyValue<>(key.getUsername(), value));
 
                rankings
                                .join(users, (ranking, user) -> UserRanking.of(
@@ -76,6 +78,7 @@ public class QueryStreamProcessor
                                .toTable(
                                                Materialized
                                                                .<String, UserRanking>as(rankingStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
                                                                .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
 
                Topology topology = builder.build();
index 19ada51..1315eae 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -150,9 +151,9 @@ public class QueryApplicationIT
                KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
                {
                        Map<String, Object> properties = Map.of(
-                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
                                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+                                       JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
                        return new KafkaTemplate(producerFactory, properties);
                }
 
index fda7408..203c813 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.query;
 
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -29,7 +30,7 @@ public class QueryStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<String, TestRanking> top10In;
+  TestInputTopic<TestUser, TestRanking> top10In;
   TestInputTopic<String, TestUserData> userIn;
 
 
@@ -46,13 +47,13 @@ public class QueryStreamProcessorTopologyTest
 
     top10In = testDriver.createInputTopic(
         TOP10_IN,
-        new StringSerializer(),
-        jsonSerializer(TestRanking.class));
+        jsonSerializer(TestUser.class, true),
+        jsonSerializer(TestRanking.class,false));
 
     userIn = testDriver.createInputTopic(
         USERS_IN,
         new StringSerializer(),
-        jsonSerializer(TestUserData.class).noTypeInfo());
+        jsonSerializer(TestUserData.class, false).noTypeInfo());
   }
 
 
@@ -76,14 +77,15 @@ public class QueryStreamProcessorTopologyTest
     testDriver.close();
   }
 
-  private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
   {
     JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
+            "user:" + TestUser.class.getName() + "," +
             "ranking:" + TestRanking.class.getName()),
-        false);
+        isKey);
     return jsonSerializer;
   }
 }
index 1fe34d9..c190eed 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query;
 
 import de.juplo.kafka.wordcount.top10.TestEntry;
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import org.apache.kafka.streams.KeyValue;
 
@@ -14,10 +15,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static final String PETER = "peter";
-       static final String KLAUS = "klaus";
+       static final TestUser PETER = TestUser.of("peter");
+       static final TestUser KLAUS = TestUser.of("klaus");
 
-       static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+       static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
        {
                return Stream.of(TOP10_MESSAGES);
        }
@@ -29,8 +30,8 @@ class TestData
 
        static void assertExpectedState(Function<String, UserRanking> function)
        {
-               assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
-               assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
+               assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername()));
+               assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername()));
        }
 
        private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
@@ -41,7 +42,7 @@ class TestData
        private static UserRanking getLastMessageFor(String user)
        {
                return getTop10Messages()
-                               .filter(kv -> kv.key.equals(user))
+                               .filter(kv -> kv.key.getUsername().equals(user))
                                .map(kv -> kv.value)
                                .map(testRanking -> userRankingFor(user, testRanking))
                                .reduce(null, (left, right) -> right);
@@ -72,8 +73,7 @@ class TestData
                entry.setCount(testEntry.getCount());
                return entry;
        }
-
-       private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
                                        PETER,
@@ -136,10 +136,10 @@ class TestData
        private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       PETER,
-                                       TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+                                       PETER.getUsername(),
+                                       TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)),
                        KeyValue.pair(
-                                       KLAUS,
-                                       TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+                                       KLAUS.getUsername(),
+                                       TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
        };
 }
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java
new file mode 100644 (file)
index 0000000..cc48496
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUser
+{
+  String username;
+}