+
+ @DisplayName("Assert the expected number of messages")
+ @Test
+ public void testExpectedNumberOfMessagesForWord()
+ {
+ TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
+ }
+
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testExpectedLastMessagesForWord()
+ {
+ TestData.assertExpectedLastMessagesForWord(receivedMessages);
+ }
+
+ @DisplayName("Assert the expected state in the state-store")
+ @Test
+ public void testExpectedState()
+ {
+ KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
+ }
+
+ @AfterAll
+ public static void tearDown()
+ {
+ testDriver.close();
+ }
+
+
+ private static JsonSerializer serializer()
+ {
+ return new JsonSerializer().noTypeInfo();
+ }
+
+ private static JsonDeserializer<TestOutputWord> keyDeserializer()
+ {
+ return deserializer(true);
+ }
+
+ private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+ {
+ return deserializer(false);
+ }
+
+ private static <T> JsonDeserializer<T> deserializer(boolean isKey)
+ {
+ JsonDeserializer<T> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
+ isKey);
+ return deserializer;
+ }
+
+ private static String typeMappingsConfig()
+ {
+ return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+ }