counter: 1.2.15 - `ToplogyTestDriver.close` must always be called
authorKai Moritz <kai@juplo.de>
Mon, 27 May 2024 19:25:55 +0000 (21:25 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:33:30 +0000 (13:33 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java

index 16cf5d2..1b3e1e4 100644 (file)
@@ -6,6 +6,8 @@ import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
@@ -28,8 +30,14 @@ public class CounterStreamProcessorTopologyTest
   public final static String IN = "TEST-IN";
   public final static String OUT = "TEST-OUT";
 
-  @Test
-  public void test()
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<String, Word> in;
+  TestOutputTopic<Word, WordCounter> out;
+
+
+  @BeforeEach
+  public void setUpTestDriver()
   {
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
@@ -47,18 +55,23 @@ public class CounterStreamProcessorTopologyTest
     JsonSerde<?> valueSerde = new JsonSerde<>();
     valueSerde.configure(propertyMap, false);
 
-    TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+    testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<String, Word> in = testDriver.createInputTopic(
+    in = testDriver.createInputTopic(
         IN,
         (JsonSerializer<String>)keySerde.serializer(),
         (JsonSerializer<Word>)valueSerde.serializer());
 
-    TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
+    out = testDriver.createOutputTopic(
         OUT,
         (JsonDeserializer<Word>)keySerde.deserializer(),
         (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+  }
+
 
+  @Test
+  public void test()
+  {
     TestData.injectInputMessages((key, value) -> in.pipeInput(key, value));
 
     MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
@@ -77,4 +90,10 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.assertExpectedMessages(receivedMessages);
   }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
 }