From 749f4ee2ac7d2c71ca7a8d29c20ad6fcfb7ea58b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 2 Jul 2024 09:18:33 +0200 Subject: [PATCH] popular: 1.3.2 - Fixed Warning in `PopularStreamProcessorTopologyTest` --- .../PopularStreamProcessorTopologyTest.java | 5 +++-- .../kafka/wordcount/popular/TestData.java | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java index aa99b8e..e939a07 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -11,6 +11,7 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; @@ -91,8 +92,8 @@ public class PopularStreamProcessorTopologyTest @Test public void testExpectedState() { - KeyValueStore store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME); - TestData.assertExpectedState(store); + KeyValueStore> store = testDriver.getTimestampedKeyValueStore(KEY_VALUE_STORE_NAME); + TestData.assertExpectedTimestampedState(store); } @AfterAll diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 304da16..b4b8e4f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -7,6 +7,7 @@ import de.juplo.kafka.wordcount.stats.OutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -188,6 +189,15 @@ class TestData assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S))); } + static void assertExpectedTimestampedState(ReadOnlyKeyValueStore> store) + { + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S))); + } + private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord) { WindowedWord windowedWord = new WindowedWord(); @@ -217,6 +227,14 @@ class TestData assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter); } + private static void assertWordCountEqualsWordCountFromLastMessage( + OutputWindowedWord word, + ValueAndTimestamp timestampedCounter) + { + WordCounter counter = timestampedCounter.value(); + assertWordCountEqualsWordCountFromLastMessage(word, counter); + } + private static void assertWordCountEqualsWordCountFromLastMessage( OutputWindowedWord word, OutputWordCounter counter) -- 2.20.1