top10: 1.1.2 - (RED) `ToplogyTestDriver.close` must always be called
authorKai Moritz <kai@juplo.de>
Mon, 27 May 2024 20:56:48 +0000 (22:56 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 May 2024 10:03:04 +0000 (12:03 +0200)
* If the `TopologyTestDriver` is _not_ closed, it leaves behind the
  created RocksDB.
* Hence, the test will fail, if it changes state and expects a clean
  slate in the beginning.
* Therefore, the call to `close()` should happen in `@AfterEach`!

src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

index 8ecf9fa..86314e5 100644 (file)
@@ -5,6 +5,8 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
@@ -28,8 +30,14 @@ public class Top10StreamProcessorTopologyTest
   public final static String IN = "TEST-IN";
   public final static String OUT = "TEST-OUT";
 
-  @Test
-  public void test()
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<Key, Entry> in;
+  TestOutputTopic<String, Ranking> out;
+
+
+  @BeforeEach
+  public void setUp()
   {
     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
 
@@ -44,18 +52,24 @@ public class Top10StreamProcessorTopologyTest
     JsonSerde<?> valueSerde = new JsonSerde<>();
     valueSerde.configure(propertyMap, false);
 
-    TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+    testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
+    in = testDriver.createInputTopic(
         IN,
         (JsonSerializer<Key>)keySerde.serializer(),
         (JsonSerializer<Entry>)valueSerde.serializer());
 
-    TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
+    out = testDriver.createOutputTopic(
         OUT,
         (JsonDeserializer<String>)keySerde.deserializer(),
         (JsonDeserializer<Ranking>)valueSerde.deserializer());
 
+  }
+
+
+  @Test
+  public void test()
+  {
     Stream
         .of(TestData.INPUT_MESSAGES)
         .forEach(kv -> in.pipeInput(
@@ -77,7 +91,11 @@ public class Top10StreamProcessorTopologyTest
         });
 
     TestData.assertExpectedMessages(receivedMessages);
+  }
 
+  @AfterEach
+  public void tearDown()
+  {
     testDriver.close();
   }
 }