]> juplo.de Git - demos/kafka/wordcount/commitdiff
counter: 1.2.15 - Added assertion for the expected state
authorKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 09:33:53 +0000 (11:33 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:33:30 +0000 (13:33 +0200)
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

index 738bc989d4e554382bcf006a282ead9434caa368..6872d5d053e628a52f0cbd4035159fcd73768315 100644 (file)
@@ -16,6 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -90,6 +91,6 @@ public class CounterApplicationConfiguriation
        @Bean
        public KeyValueBytesStoreSupplier storeSupplier()
        {
-               return Stores.persistentKeyValueStore("counter");
+               return Stores.persistentKeyValueStore(STORE_NAME);
        }
 }
index b1343a775e183e91f543ed2ec8c9986d5b8def91..712ab6573be5b7d555a1d96efc895b14bd0e457e 100644 (file)
@@ -2,14 +2,13 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.Properties;
 
@@ -17,6 +16,9 @@ import java.util.Properties;
 @Slf4j
 public class CounterStreamProcessor
 {
+       public static final String STORE_NAME = "counter";
+
+
        public final KafkaStreams streams;
 
 
@@ -59,6 +61,11 @@ public class CounterStreamProcessor
                return topology;
        }
 
+       ReadOnlyKeyValueStore<Word, Long> getStore()
+       {
+               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+       }
+
        public void start()
        {
                log.info("Starting Stream-Processor");
index d08930c8e41c8bb28996399671dd6c2a9e6b7146..a53ffc801079476365dff0d16d5d3d7248539c70 100644 (file)
@@ -28,6 +28,7 @@ import java.time.Duration;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -55,6 +56,8 @@ public class CounterApplicationIT
 
        @Autowired
        Consumer consumer;
+       @Autowired
+       CounterStreamProcessor streamProcessor;
 
 
        @BeforeAll
@@ -109,6 +112,15 @@ public class CounterApplicationIT
                                .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
        }
 
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+       }
+
 
        static class Consumer
        {
@@ -143,7 +155,7 @@ public class CounterApplicationIT
                @Bean
                KeyValueBytesStoreSupplier inMemoryStoreSupplier()
                {
-                       return Stores.inMemoryKeyValueStore("TEST-STORE");
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
                }
        }
 }
index 4b67052e3cfcaba8aaff143a2df0d03aba1a5570..6e244e26c4d3c6ed8043f820a968c7a8a17b40ca 100644 (file)
@@ -9,6 +9,7 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -24,8 +25,9 @@ import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.
 @Slf4j
 public class CounterStreamProcessorTopologyTest
 {
-  public final static String IN = "TEST-IN";
-  public final static String OUT = "TEST-OUT";
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
 
 
   TopologyTestDriver testDriver;
@@ -39,7 +41,7 @@ public class CounterStreamProcessorTopologyTest
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+        Stores.inMemoryKeyValueStore(STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
@@ -75,6 +77,9 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
     TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
   }
 
   @AfterEach
index b5c6c46fcdda396cba69e4e2c20aaf4c4427e434..4e431e15106c5071a09bd8ebb213b7d49c5719df 100644 (file)
@@ -4,6 +4,7 @@ import de.juplo.kafka.wordcount.splitter.TestInputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -109,6 +110,25 @@ class TestData
                return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
        }
 
+       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+       {
+               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+       }
+
+       private static Word wordOf(TestOutputWord testOutputWord)
+       {
+               Word word = new Word();
+
+               word.setUser(testOutputWord.getUser());
+               word.setWord(testOutputWord.getWord());
+
+               return word;
+       }
+
        static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
        {
                assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
@@ -118,6 +138,17 @@ class TestData
                assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
        }
 
+       private static void assertWordCountEqualsWordCountFromLastMessage(
+                       TestOutputWord word,
+                       Long counter)
+       {
+               TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+                               word.getUser(),
+                               word.getWord(),
+                               counter);
+               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+       }
+
        private static void assertWordCountEqualsWordCountFromLastMessage(
                        TestOutputWord word,
                        TestOutputWordCounter counter)