X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;h=c2ada6f4349e0049930ebd5ae4869811deb0afa9;hb=b02b75a4e70c41795a58ca38cd62d455af8aea16;hp=0000000000000000000000000000000000000000;hpb=e1ec32bcc70e5f990ca2ddb5566f98cee7e4c2c2;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java new file mode 100644 index 0000000..c2ada6f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -0,0 +1,54 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.*; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Properties; + + +public class CounterStreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + @Test + public void test() + { + Topology topology = CounterStreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"), + new ObjectMapper()); + + CounterApplicationConfiguriation config = + new CounterApplicationConfiguriation(); + Properties properties = + config.propertyMap(new CounterApplicationProperties()); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + + TestInputTopic in = testDriver.createInputTopic( + IN, + new StringSerializer(), + new StringSerializer()); + + TestOutputTopic out = testDriver.createOutputTopic( + OUT, + new StringDeserializer(), + new StringDeserializer()); + + TestData.writeInputData((key, value) -> in.pipeInput(key, value)); + + List receivedMessages = out + .readRecordsToList() + .stream() + .map(record -> Message.of(record.key(), record.value())) + .toList(); + + TestData.assertExpectedResult(receivedMessages); + } +}