query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / QueryStreamProcessorTopologyTest.java
index 8439be1..203c813 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.wordcount.query;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -15,6 +15,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.util.Map;
+
 import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
 
 
@@ -23,11 +25,12 @@ public class QueryStreamProcessorTopologyTest
 {
   public static final String TOP10_IN = "TOP10-IN";
   public static final String USERS_IN = "USERS-IN";
-  public static final String STORE_NAME = "TOPOLOGY-TEST";
+  public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
+  public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<String, TestRanking> top10In;
+  TestInputTopic<TestUser, TestRanking> top10In;
   TestInputTopic<String, TestUserData> userIn;
 
 
@@ -37,20 +40,20 @@ public class QueryStreamProcessorTopologyTest
     Topology topology = QueryStreamProcessor.buildTopology(
         USERS_IN,
         TOP10_IN,
-        Stores.inMemoryKeyValueStore(STORE_NAME),
-        new ObjectMapper());
+        Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+        Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
     top10In = testDriver.createInputTopic(
         TOP10_IN,
-        new StringSerializer(),
-        new JsonSerializer());
+        jsonSerializer(TestUser.class, true),
+        jsonSerializer(TestRanking.class,false));
 
     userIn = testDriver.createInputTopic(
         USERS_IN,
         new StringSerializer(),
-        new JsonSerializer());
+        jsonSerializer(TestUserData.class, false).noTypeInfo());
   }
 
 
@@ -64,8 +67,8 @@ public class QueryStreamProcessorTopologyTest
         .getTop10Messages()
         .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
 
-    KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
-    TestData.assertExpectedState(store);
+    KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
+    TestData.assertExpectedState(user -> store.get(user));
   }
 
   @AfterEach
@@ -73,4 +76,16 @@ public class QueryStreamProcessorTopologyTest
   {
     testDriver.close();
   }
+
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+  {
+    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+    jsonSerializer.configure(
+        Map.of(
+            JsonSerializer.TYPE_MAPPINGS,
+            "user:" + TestUser.class.getName() + "," +
+            "ranking:" + TestRanking.class.getName()),
+        isKey);
+    return jsonSerializer;
+  }
 }