import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
@Test
public void testExpectedState()
{
- KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
- TestData.assertExpectedState(store);
+ KeyValueStore<WindowedWord, ValueAndTimestamp<WordCounter>> store = testDriver.getTimestampedKeyValueStore(KEY_VALUE_STORE_NAME);
+ TestData.assertExpectedTimestampedState(store);
}
@AfterAll
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
}
+ static void assertExpectedTimestampedState(ReadOnlyKeyValueStore<WindowedWord, ValueAndTimestamp<WordCounter>> store)
+ {
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
+ }
+
private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
{
WindowedWord windowedWord = new WindowedWord();
assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
}
+ private static void assertWordCountEqualsWordCountFromLastMessage(
+ OutputWindowedWord word,
+ ValueAndTimestamp<WordCounter> timestampedCounter)
+ {
+ WordCounter counter = timestampedCounter.value();
+ assertWordCountEqualsWordCountFromLastMessage(word, counter);
+ }
+
private static void assertWordCountEqualsWordCountFromLastMessage(
OutputWindowedWord word,
OutputWordCounter counter)