query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / QueryStreamProcessorTopologyTest.java
index fda7408..203c813 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.query;
 
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -29,7 +30,7 @@ public class QueryStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<String, TestRanking> top10In;
+  TestInputTopic<TestUser, TestRanking> top10In;
   TestInputTopic<String, TestUserData> userIn;
 
 
@@ -46,13 +47,13 @@ public class QueryStreamProcessorTopologyTest
 
     top10In = testDriver.createInputTopic(
         TOP10_IN,
-        new StringSerializer(),
-        jsonSerializer(TestRanking.class));
+        jsonSerializer(TestUser.class, true),
+        jsonSerializer(TestRanking.class,false));
 
     userIn = testDriver.createInputTopic(
         USERS_IN,
         new StringSerializer(),
-        jsonSerializer(TestUserData.class).noTypeInfo());
+        jsonSerializer(TestUserData.class, false).noTypeInfo());
   }
 
 
@@ -76,14 +77,15 @@ public class QueryStreamProcessorTopologyTest
     testDriver.close();
   }
 
-  private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
   {
     JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
+            "user:" + TestUser.class.getName() + "," +
             "ranking:" + TestRanking.class.getName()),
-        false);
+        isKey);
     return jsonSerializer;
   }
 }