X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessorTopologyTest.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessorTopologyTest.java;h=1becd654cc74ab107795a09ddd9f14e5f8fada6a;hb=b69e1270a308f200b2640a01d37d4636a0a549e1;hp=01c1cf637351139669177914bbb77d3ae85d9fc7;hpb=df0c22234e9ace115b4e4abc4e3d881d5595668e;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 01c1cf6..1becd65 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -5,6 +5,8 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,8 +29,9 @@ import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.K @Slf4j public class Top10StreamProcessorTopologyTest { - public final static String IN = "TEST-IN"; - public final static String OUT = "TEST-OUT"; + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -39,7 +42,10 @@ public class Top10StreamProcessorTopologyTest @BeforeEach public void setUp() { - Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); + Topology topology = Top10StreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); Top10ApplicationConfiguration applicationConfiguriation = new Top10ApplicationConfiguration(); @@ -91,6 +97,9 @@ public class Top10StreamProcessorTopologyTest }); TestData.assertExpectedMessages(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); } @AfterEach