From: Kai Moritz Date: Sat, 8 Jun 2024 09:33:53 +0000 (+0200) Subject: counter: 1.2.15 - Added assertion for the expected state X-Git-Tag: counter-1.2.15~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3eb1ec997478982fffb31f09c21e756b529e474a;p=demos%2Fkafka%2Fwordcount counter: 1.2.15 - Added assertion for the expected state --- diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 738bc98..6872d5d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -16,6 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -90,6 +91,6 @@ public class CounterApplicationConfiguriation @Bean public KeyValueBytesStoreSupplier storeSupplier() { - return Stores.persistentKeyValueStore("counter"); + return Stores.persistentKeyValueStore(STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index b1343a7..712ab65 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -2,14 +2,13 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -17,6 +16,9 @@ import java.util.Properties; @Slf4j public class CounterStreamProcessor { + public static final String STORE_NAME = "counter"; + + public final KafkaStreams streams; @@ -59,6 +61,11 @@ public class CounterStreamProcessor return topology; } + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + } + public void start() { log.info("Starting Stream-Processor"); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index d08930c..a53ffc8 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -28,6 +28,7 @@ import java.time.Duration; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -55,6 +56,8 @@ public class CounterApplicationIT @Autowired Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; @BeforeAll @@ -109,6 +112,15 @@ public class CounterApplicationIT .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion())); } + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + static class Consumer { @@ -143,7 +155,7 @@ public class CounterApplicationIT @Bean KeyValueBytesStoreSupplier inMemoryStoreSupplier() { - return Stores.inMemoryKeyValueStore("TEST-STORE"); + return Stores.inMemoryKeyValueStore(STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 4b67052..6e244e2 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -9,6 +9,7 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -24,8 +25,9 @@ import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation. @Slf4j public class CounterStreamProcessorTopologyTest { - public final static String IN = "TEST-IN"; - public final static String OUT = "TEST-OUT"; + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -39,7 +41,7 @@ public class CounterStreamProcessorTopologyTest Topology topology = CounterStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); + Stores.inMemoryKeyValueStore(STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -75,6 +77,9 @@ public class CounterStreamProcessorTopologyTest TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); TestData.assertExpectedLastMessagesForWord(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); } @AfterEach diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index b5c6c46..4e431e1 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -4,6 +4,7 @@ import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -109,6 +110,25 @@ class TestData return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages); } + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + } + + private static Word wordOf(TestOutputWord testOutputWord) + { + Word word = new Word(); + + word.setUser(testOutputWord.getUser()); + word.setWord(testOutputWord.getWord()); + + return word; + } + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) { assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); @@ -118,6 +138,17 @@ class TestData assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); } + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + Long counter) + { + TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( + word.getUser(), + word.getWord(), + counter); + assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + } + private static void assertWordCountEqualsWordCountFromLastMessage( TestOutputWord word, TestOutputWordCounter counter)