counter: 1.3.1 - Refined `CounterStreamProcessorTopologyTest`
authorKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 17:38:39 +0000 (19:38 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 19:06:37 +0000 (21:06 +0200)
* `CounterStreamProcessorTopologyTest` uses the type-headers to determine
  the correct type for the deserialization of the output-data.
* Beforehand, the used types were hard-coded in the test.

src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java

index 9c86c6c..06a0798 100644 (file)
@@ -19,6 +19,9 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 
@@ -45,15 +48,8 @@ public class CounterStreamProcessorTopologyTest
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
-    in = testDriver.createInputTopic(
-        IN,
-        new JsonSerializer().noTypeInfo(),
-        new JsonSerializer().noTypeInfo());
-
-    out = testDriver.createOutputTopic(
-        OUT,
-        new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(),
-        new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders());
+    in = testDriver.createInputTopic(IN, serializer(), serializer());
+    out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
   }
 
 
@@ -83,4 +79,45 @@ public class CounterStreamProcessorTopologyTest
   {
     testDriver.close();
   }
+
+
+  private static JsonSerializer serializer()
+  {
+    return new JsonSerializer().noTypeInfo();
+  }
+
+  private JsonDeserializer<TestOutputWord> keyDeserializer()
+  {
+    return deserializer(true);
+  }
+
+  private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+  {
+    return deserializer(false);
+  }
+
+  private static <T> JsonDeserializer<T> deserializer(boolean isKey)
+  {
+    JsonDeserializer<T> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
+        isKey);
+    return deserializer;
+  }
+
+  private static String typeMappingsConfig()
+  {
+    return typeMappings()
+        .entrySet()
+        .stream()
+        .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
+        .collect(Collectors.joining(","));
+  }
+
+  private static Map<String, Class> typeMappings()
+  {
+    return Map.of(
+        "word", TestOutputWord.class,
+        "counter", TestOutputWordCounter.class);
+  }
 }