counter: 1.2.15 - `TestData` only holds and asserts the test-data
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
index 5b9f365..8e09d0c 100644 (file)
@@ -6,6 +6,8 @@ import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
@@ -15,6 +17,7 @@ import org.springframework.util.MultiValueMap;
 
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Stream;
 
 import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
@@ -28,8 +31,14 @@ public class CounterStreamProcessorTopologyTest
   public final static String IN = "TEST-IN";
   public final static String OUT = "TEST-OUT";
 
-  @Test
-  public void test()
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<String, Word> in;
+  TestOutputTopic<Word, WordCounter> out;
+
+
+  @BeforeEach
+  public void setUpTestDriver()
   {
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
@@ -47,19 +56,26 @@ public class CounterStreamProcessorTopologyTest
     JsonSerde<?> valueSerde = new JsonSerde<>();
     valueSerde.configure(propertyMap, false);
 
-    TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+    testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<String, Word> in = testDriver.createInputTopic(
+    in = testDriver.createInputTopic(
         IN,
         (JsonSerializer<String>)keySerde.serializer(),
         (JsonSerializer<Word>)valueSerde.serializer());
 
-    TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
+    out = testDriver.createOutputTopic(
         OUT,
         (JsonDeserializer<Word>)keySerde.deserializer(),
         (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+  }
 
-    TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+
+  @Test
+  public void test()
+  {
+    Stream
+        .of(TestData.INPUT_MESSAGES)
+        .forEach(word -> in.pipeInput(word.getUser(), word));
 
     MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
     out
@@ -75,6 +91,12 @@ public class CounterStreamProcessorTopologyTest
           receivedMessages.add(record.key(), record.value());
         });
 
-    TestData.assertExpectedResult(receivedMessages);
+    TestData.assertExpectedMessages(receivedMessages);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
   }
 }