From 29899898af28dcbb50326ae5837ada195829c592 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 19:38:39 +0200 Subject: [PATCH] counter: 1.3.1 - Refined `CounterStreamProcessorTopologyTest` * `CounterStreamProcessorTopologyTest` uses the type-headers to determine the correct type for the deserialization of the output-data. * Beforehand, the used types were hard-coded in the test. --- .../CounterStreamProcessorTopologyTest.java | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 9c86c6c..06a0798 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -19,6 +19,9 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.util.Map; +import java.util.stream.Collectors; + import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; @@ -45,15 +48,8 @@ public class CounterStreamProcessorTopologyTest testDriver = new TopologyTestDriver(topology, serializationConfig()); - in = testDriver.createInputTopic( - IN, - new JsonSerializer().noTypeInfo(), - new JsonSerializer().noTypeInfo()); - - out = testDriver.createOutputTopic( - OUT, - new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(), - new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders()); + in = testDriver.createInputTopic(IN, serializer(), serializer()); + out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); } @@ -83,4 +79,45 @@ public class CounterStreamProcessorTopologyTest { testDriver.close(); } + + + private static JsonSerializer serializer() + { + return new JsonSerializer().noTypeInfo(); + } + + private JsonDeserializer keyDeserializer() + { + return deserializer(true); + } + + private static JsonDeserializer valueDeserializer() + { + return deserializer(false); + } + + private static JsonDeserializer deserializer(boolean isKey) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return deserializer; + } + + private static String typeMappingsConfig() + { + return typeMappings() + .entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) + .collect(Collectors.joining(",")); + } + + private static Map typeMappings() + { + return Map.of( + "word", TestOutputWord.class, + "counter", TestOutputWordCounter.class); + } } -- 2.20.1