counter: 1.2.15 - Added assertion for the expected state
authorKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 09:33:53 +0000 (11:33 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:33:30 +0000 (13:33 +0200)
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

index 738bc98..6872d5d 100644 (file)
@@ -16,6 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -90,6 +91,6 @@ public class CounterApplicationConfiguriation
        @Bean
        public KeyValueBytesStoreSupplier storeSupplier()
        {
-               return Stores.persistentKeyValueStore("counter");
+               return Stores.persistentKeyValueStore(STORE_NAME);
        }
 }
index b1343a7..712ab65 100644 (file)
@@ -2,14 +2,13 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.Properties;
 
@@ -17,6 +16,9 @@ import java.util.Properties;
 @Slf4j
 public class CounterStreamProcessor
 {
+       public static final String STORE_NAME = "counter";
+
+
        public final KafkaStreams streams;
 
 
@@ -59,6 +61,11 @@ public class CounterStreamProcessor
                return topology;
        }
 
+       ReadOnlyKeyValueStore<Word, Long> getStore()
+       {
+               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+       }
+
        public void start()
        {
                log.info("Starting Stream-Processor");
index d08930c..a53ffc8 100644 (file)
@@ -28,6 +28,7 @@ import java.time.Duration;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -55,6 +56,8 @@ public class CounterApplicationIT
 
        @Autowired
        Consumer consumer;
+       @Autowired
+       CounterStreamProcessor streamProcessor;
 
 
        @BeforeAll
@@ -109,6 +112,15 @@ public class CounterApplicationIT
                                .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
        }
 
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+       }
+
 
        static class Consumer
        {
@@ -143,7 +155,7 @@ public class CounterApplicationIT
                @Bean
                KeyValueBytesStoreSupplier inMemoryStoreSupplier()
                {
-                       return Stores.inMemoryKeyValueStore("TEST-STORE");
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
                }
        }
 }
index 4b67052..6e244e2 100644 (file)
@@ -9,6 +9,7 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -24,8 +25,9 @@ import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.
 @Slf4j
 public class CounterStreamProcessorTopologyTest
 {
-  public final static String IN = "TEST-IN";
-  public final static String OUT = "TEST-OUT";
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
 
 
   TopologyTestDriver testDriver;
@@ -39,7 +41,7 @@ public class CounterStreamProcessorTopologyTest
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+        Stores.inMemoryKeyValueStore(STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
@@ -75,6 +77,9 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
     TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
   }
 
   @AfterEach
index b5c6c46..4e431e1 100644 (file)
@@ -4,6 +4,7 @@ import de.juplo.kafka.wordcount.splitter.TestInputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -109,6 +110,25 @@ class TestData
                return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
        }
 
+       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+       {
+               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+       }
+
+       private static Word wordOf(TestOutputWord testOutputWord)
+       {
+               Word word = new Word();
+
+               word.setUser(testOutputWord.getUser());
+               word.setWord(testOutputWord.getWord());
+
+               return word;
+       }
+
        static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
        {
                assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
@@ -118,6 +138,17 @@ class TestData
                assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
        }
 
+       private static void assertWordCountEqualsWordCountFromLastMessage(
+                       TestOutputWord word,
+                       Long counter)
+       {
+               TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+                               word.getUser(),
+                               word.getWord(),
+                               counter);
+               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+       }
+
        private static void assertWordCountEqualsWordCountFromLastMessage(
                        TestOutputWord word,
                        TestOutputWordCounter counter)