From b02b75a4e70c41795a58ca38cd62d455af8aea16 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Feb 2023 13:02:44 +0100 Subject: [PATCH] counter: 1.1.11 - Added a test, that is based on `TopologyTestDriver` - The test reuses `TestData` to asserts the exact same assumptions, as `CounterApplicationIT`. - The only difference is, that the message processing is carried out by the `ToplogyTestDriver` instead of a real Kafka cluster, that is started along the test-code in the same JVM, as in `CounterApplicationIT`. --- pom.xml | 2 +- .../counter/CounterStreamProcessor.java | 18 ++++++- .../CounterStreamProcessorTopologyTest.java | 54 +++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java diff --git a/pom.xml b/pom.xml index 48183b5..3b72889 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.1.10 + 1.1.11 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index b19502d..4002df2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -25,6 +26,21 @@ public class CounterStreamProcessor Properties properties, KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) + { + Topology topology = CounterStreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier, + mapper); + + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier, + ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -48,7 +64,7 @@ public class CounterStreamProcessor .toStream() .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + return builder.build(); } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java new file mode 100644 index 0000000..c2ada6f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -0,0 +1,54 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.*; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Properties; + + +public class CounterStreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + @Test + public void test() + { + Topology topology = CounterStreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"), + new ObjectMapper()); + + CounterApplicationConfiguriation config = + new CounterApplicationConfiguriation(); + Properties properties = + config.propertyMap(new CounterApplicationProperties()); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + + TestInputTopic in = testDriver.createInputTopic( + IN, + new StringSerializer(), + new StringSerializer()); + + TestOutputTopic out = testDriver.createOutputTopic( + OUT, + new StringDeserializer(), + new StringDeserializer()); + + TestData.writeInputData((key, value) -> in.pipeInput(key, value)); + + List receivedMessages = out + .readRecordsToList() + .stream() + .map(record -> Message.of(record.key(), record.value())) + .toList(); + + TestData.assertExpectedResult(receivedMessages); + } +} -- 2.20.1