]> juplo.de Git - demos/kafka/wordcount/commitdiff
query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON
authorKai Moritz <kai@juplo.de>
Fri, 14 Jun 2024 18:39:41 +0000 (20:39 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 19:33:50 +0000 (21:33 +0200)
* The seemingly straightforward change leds to a very strange and
  inconsisten error-situation.
* Only the integration-test fails, while the topology-test works as
  originally expected.
* The cause of the error is a missing serde-config for the key of the
  ``rankings``-``KStream``, which defies easy explanation.
* The best explanation is, that the ``map()``-operation - despite
  possibly changing the type of the key and/or value - does not by itself
  define a parameter for specifing a corresponding serialization-config.
* The reason for this is, that the operation does not define the complete
  operation by itself.
* In order to take effect, it has to be combined with a second operation,
  that actually creates the outgoing topic.
* Without that second DSL-operation, `map()` simply would yield no action.
* And that is, why the serialization has to be defined on that second
  operation and cannot be defined on `map()` itself.
* But the really strange thing about the error is, that it _only_ shows up
  in `QueryApplicationIT`.
* It does not show in `QueryStreamProcessorTopologyTest` _and_ it does
  _not_ show up, if the application is compiled and started in the
  docker-setup.
* One possible explanation for this wired behaviour might be a bug or
  misconception in the interpretation of the beforehand build topology,
  that leads to a non-deterministic behaviour.
* Another possible explanation might be subtle differences in the internal
  caching behaviour -- but that seems unlikely, because tests, that are
  based on the `TopologyTestDriver` do not cache and are very (on the oposit)
  very handy if one wants to reveal bugs concerning the serialization and
  and running the application with the caching settings from the IT does
  not show the error.

src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/query/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java [new file with mode: 0644]

index 6c7844d823c88b549079639ea0d34d81d20fa703..440d5c4649b0fdb328a7c51e2df8ea8c6e461cf5 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -86,7 +85,7 @@ public class QueryApplicationConfiguration
        {
                Properties props = new Properties();
 
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
index dcb123485fd17e35aaa77e1a2b4d8a8aa584fd44..bf27e2d4a678e0d9cbd5f9d64768069ff872b07f 100644 (file)
@@ -60,13 +60,15 @@ public class QueryStreamProcessor
                KTable<String, User> users = builder
                                .stream(
                                                usersInputTopic,
-                                               Consumed.with(null, new JsonSerde().copyWithType(User.class)))
+                                               Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
                                .toTable(
                                                Materialized
                                                                .<String, User>as(userStoreSupplier)
                                                                .withKeySerde(Serdes.String())
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
-               KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+               KStream<String, Ranking> rankings = builder
+                               .<Key, Ranking>stream(rankingInputTopic)
+                               .map((key, value) -> new KeyValue<>(key.getUsername(), value));
 
                rankings
                                .join(users, (ranking, user) -> UserRanking.of(
@@ -76,6 +78,7 @@ public class QueryStreamProcessor
                                .toTable(
                                                Materialized
                                                                .<String, UserRanking>as(rankingStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
                                                                .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
 
                Topology topology = builder.build();
index 19ada51fc53d75cc572efc9fb77a56d3b0580ffb..1315eae04e0f67da8ec7294e68346d44d29e8083 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -150,9 +151,9 @@ public class QueryApplicationIT
                KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
                {
                        Map<String, Object> properties = Map.of(
-                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
                                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+                                       JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
                        return new KafkaTemplate(producerFactory, properties);
                }
 
index fda74087e2b6927e0a479a0a2837dd6dbb4f8fba..203c813875d6131c747af31960878870c4b6f995 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.query;
 
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -29,7 +30,7 @@ public class QueryStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<String, TestRanking> top10In;
+  TestInputTopic<TestUser, TestRanking> top10In;
   TestInputTopic<String, TestUserData> userIn;
 
 
@@ -46,13 +47,13 @@ public class QueryStreamProcessorTopologyTest
 
     top10In = testDriver.createInputTopic(
         TOP10_IN,
-        new StringSerializer(),
-        jsonSerializer(TestRanking.class));
+        jsonSerializer(TestUser.class, true),
+        jsonSerializer(TestRanking.class,false));
 
     userIn = testDriver.createInputTopic(
         USERS_IN,
         new StringSerializer(),
-        jsonSerializer(TestUserData.class).noTypeInfo());
+        jsonSerializer(TestUserData.class, false).noTypeInfo());
   }
 
 
@@ -76,14 +77,15 @@ public class QueryStreamProcessorTopologyTest
     testDriver.close();
   }
 
-  private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
   {
     JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
+            "user:" + TestUser.class.getName() + "," +
             "ranking:" + TestRanking.class.getName()),
-        false);
+        isKey);
     return jsonSerializer;
   }
 }
index 1fe34d9eecf50c609910dd02be5c5d0b919ef986..c190eed24af4ad24392e25ec2e64b388c6db6085 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query;
 
 import de.juplo.kafka.wordcount.top10.TestEntry;
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import org.apache.kafka.streams.KeyValue;
 
@@ -14,10 +15,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static final String PETER = "peter";
-       static final String KLAUS = "klaus";
+       static final TestUser PETER = TestUser.of("peter");
+       static final TestUser KLAUS = TestUser.of("klaus");
 
-       static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+       static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
        {
                return Stream.of(TOP10_MESSAGES);
        }
@@ -29,8 +30,8 @@ class TestData
 
        static void assertExpectedState(Function<String, UserRanking> function)
        {
-               assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
-               assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
+               assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername()));
+               assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername()));
        }
 
        private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
@@ -41,7 +42,7 @@ class TestData
        private static UserRanking getLastMessageFor(String user)
        {
                return getTop10Messages()
-                               .filter(kv -> kv.key.equals(user))
+                               .filter(kv -> kv.key.getUsername().equals(user))
                                .map(kv -> kv.value)
                                .map(testRanking -> userRankingFor(user, testRanking))
                                .reduce(null, (left, right) -> right);
@@ -72,8 +73,7 @@ class TestData
                entry.setCount(testEntry.getCount());
                return entry;
        }
-
-       private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
                                        PETER,
@@ -136,10 +136,10 @@ class TestData
        private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       PETER,
-                                       TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+                                       PETER.getUsername(),
+                                       TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)),
                        KeyValue.pair(
-                                       KLAUS,
-                                       TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+                                       KLAUS.getUsername(),
+                                       TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
        };
 }
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java
new file mode 100644 (file)
index 0000000..cc48496
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUser
+{
+  String username;
+}