projects
/
demos
/
kafka
/
wordcount
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
counter: 1.2.15 - Added assertion for the expected state
[demos/kafka/wordcount]
/
src
/
test
/
java
/
de
/
juplo
/
kafka
/
wordcount
/
counter
/
CounterStreamProcessorTopologyTest.java
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
index
4b67052
..
6e244e2
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
@@
-9,6
+9,7
@@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@
-24,8
+25,9
@@
import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.
@Slf4j
public class CounterStreamProcessorTopologyTest
{
@Slf4j
public class CounterStreamProcessorTopologyTest
{
- public final static String IN = "TEST-IN";
- public final static String OUT = "TEST-OUT";
+ public static final String IN = "TEST-IN";
+ public static final String OUT = "TEST-OUT";
+ public static final String STORE_NAME = "TOPOLOGY-TEST";
TopologyTestDriver testDriver;
TopologyTestDriver testDriver;
@@
-39,7
+41,7
@@
public class CounterStreamProcessorTopologyTest
Topology topology = CounterStreamProcessor.buildTopology(
IN,
OUT,
Topology topology = CounterStreamProcessor.buildTopology(
IN,
OUT,
- Stores.inMemoryKeyValueStore(
"TOPOLOGY-TEST"
));
+ Stores.inMemoryKeyValueStore(
STORE_NAME
));
testDriver = new TopologyTestDriver(topology, serializationConfig());
testDriver = new TopologyTestDriver(topology, serializationConfig());
@@
-75,6
+77,9
@@
public class CounterStreamProcessorTopologyTest
TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
TestData.assertExpectedLastMessagesForWord(receivedMessages);
TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+ KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
}
@AfterEach
}
@AfterEach