counter: 1.3.1 - Refined `CounterStreamProcessor` (DRY for type-mapping)
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
index 0ffd516..cfb6bd8 100644 (file)
@@ -19,7 +19,10 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.util.Map;
+
 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 
 
 @Slf4j
@@ -27,7 +30,6 @@ public class CounterStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
-  public static final String STORE_NAME = "TOPOLOGY-TEST";
 
 
   TopologyTestDriver testDriver;
@@ -45,19 +47,8 @@ public class CounterStreamProcessorTopologyTest
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
-    in = testDriver.createInputTopic(
-        IN,
-        new JsonSerializer().noTypeInfo(),
-        new JsonSerializer().noTypeInfo());
-
-    out = testDriver.createOutputTopic(
-        OUT,
-        new JsonDeserializer()
-            .copyWithType(TestOutputWord.class)
-            .ignoreTypeHeaders(),
-        new JsonDeserializer()
-            .copyWithType(TestOutputWordCounter.class)
-            .ignoreTypeHeaders());
+    in = testDriver.createInputTopic(IN, serializer(), serializer());
+    out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
   }
 
 
@@ -87,4 +78,34 @@ public class CounterStreamProcessorTopologyTest
   {
     testDriver.close();
   }
+
+
+  private static JsonSerializer serializer()
+  {
+    return new JsonSerializer().noTypeInfo();
+  }
+
+  private JsonDeserializer<TestOutputWord> keyDeserializer()
+  {
+    return deserializer(true);
+  }
+
+  private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+  {
+    return deserializer(false);
+  }
+
+  private static <T> JsonDeserializer<T> deserializer(boolean isKey)
+  {
+    JsonDeserializer<T> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
+        isKey);
+    return deserializer;
+  }
+
+  private static String typeMappingsConfig()
+  {
+    return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+  }
 }