popular: 1.3.2 - Fixed Warning in `PopularStreamProcessorTopologyTest` popular
authorKai Moritz <kai@juplo.de>
Tue, 2 Jul 2024 07:18:33 +0000 (09:18 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 2 Jul 2024 07:18:33 +0000 (09:18 +0200)
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java

index aa99b8e..e939a07 100644 (file)
@@ -11,6 +11,7 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
@@ -91,8 +92,8 @@ public class PopularStreamProcessorTopologyTest
   @Test
   public void testExpectedState()
   {
-    KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
-    TestData.assertExpectedState(store);
+    KeyValueStore<WindowedWord, ValueAndTimestamp<WordCounter>> store = testDriver.getTimestampedKeyValueStore(KEY_VALUE_STORE_NAME);
+    TestData.assertExpectedTimestampedState(store);
   }
 
   @AfterAll
index 304da16..b4b8e4f 100644 (file)
@@ -7,6 +7,7 @@ import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -188,6 +189,15 @@ class TestData
                assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
        }
 
+       static void assertExpectedTimestampedState(ReadOnlyKeyValueStore<WindowedWord, ValueAndTimestamp<WordCounter>> store)
+       {
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
+       }
+
        private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
        {
                WindowedWord windowedWord = new WindowedWord();
@@ -217,6 +227,14 @@ class TestData
                assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
        }
 
+       private static void assertWordCountEqualsWordCountFromLastMessage(
+                       OutputWindowedWord word,
+                       ValueAndTimestamp<WordCounter> timestampedCounter)
+       {
+               WordCounter counter = timestampedCounter.value();
+               assertWordCountEqualsWordCountFromLastMessage(word, counter);
+       }
+
        private static void assertWordCountEqualsWordCountFromLastMessage(
                        OutputWindowedWord word,
                        OutputWordCounter counter)