X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;h=6e244e26c4d3c6ed8043f820a968c7a8a17b40ca;hb=3eb1ec997478982fffb31f09c21e756b529e474a;hp=4b67052e3cfcaba8aaff143a2df0d03aba1a5570;hpb=237133719b1d06d542302fb948d9cf3aff80a8a4;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 4b67052..6e244e2 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -9,6 +9,7 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -24,8 +25,9 @@ import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation. @Slf4j public class CounterStreamProcessorTopologyTest { - public final static String IN = "TEST-IN"; - public final static String OUT = "TEST-OUT"; + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -39,7 +41,7 @@ public class CounterStreamProcessorTopologyTest Topology topology = CounterStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); + Stores.inMemoryKeyValueStore(STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -75,6 +77,9 @@ public class CounterStreamProcessorTopologyTest TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); TestData.assertExpectedLastMessagesForWord(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); } @AfterEach