counter: 1.1.11 - Added a test, that is based on `TopologyTestDriver` counter-1.1.11
authorKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 12:02:44 +0000 (13:02 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 17 Feb 2023 16:33:10 +0000 (17:33 +0100)
- The test reuses `TestData` to asserts the exact same assumptions, as
  `CounterApplicationIT`.
- The only difference is, that the message processing is carried out by
  the `ToplogyTestDriver` instead of a real Kafka cluster, that is started
  along the test-code in the same JVM, as in `CounterApplicationIT`.

pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 48183b5..3b72889 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.1.10</version>
+       <version>1.1.11</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index b19502d..4002df2 100644 (file)
@@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -25,6 +26,21 @@ public class CounterStreamProcessor
                        Properties properties,
                        KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
+       {
+               Topology topology = CounterStreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic,
+                               storeSupplier,
+                               mapper);
+
+               streams = new KafkaStreams(topology, properties);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier,
+                       ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -48,7 +64,7 @@ public class CounterStreamProcessor
                                .toStream()
                                .to(outputTopic);
 
-               streams = new KafkaStreams(builder.build(), properties);
+               return builder.build();
        }
 
        public void start()
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..c2ada6f
--- /dev/null
@@ -0,0 +1,54 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class CounterStreamProcessorTopologyTest
+{
+  public final static String IN = "TEST-IN";
+  public final static String OUT = "TEST-OUT";
+
+  @Test
+  public void test()
+  {
+    Topology topology = CounterStreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
+        new ObjectMapper());
+
+    CounterApplicationConfiguriation config =
+        new CounterApplicationConfiguriation();
+    Properties properties =
+        config.propertyMap(new CounterApplicationProperties());
+
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+    TestInputTopic<String, String> in = testDriver.createInputTopic(
+        IN,
+        new StringSerializer(),
+        new StringSerializer());
+
+    TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+        OUT,
+        new StringDeserializer(),
+        new StringDeserializer());
+
+    TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+
+    List<Message> receivedMessages = out
+        .readRecordsToList()
+        .stream()
+        .map(record -> Message.of(record.key(), record.value()))
+        .toList();
+
+    TestData.assertExpectedResult(receivedMessages);
+  }
+}