X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationIT.java;h=9995ce7f938fc4abedefc78b153e451afdc0cd35;hb=8d0426539d69616900f4d6ef19e52d50b497f57f;hp=a771904c900dc614f9d20fe5e7b2e603ef5f08f4;hpb=1127988a1b631aa9e0c0107c1a3ed9f99edd188b;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index a771904..9995ce7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -28,6 +28,7 @@ import java.time.Duration; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -55,6 +56,8 @@ public class CounterApplicationIT @Autowired Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; @BeforeAll @@ -88,7 +91,8 @@ public class CounterApplicationIT { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); } @DisplayName("Await the expected output messages") @@ -97,7 +101,27 @@ public class CounterApplicationIT { await("Expected messages") .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); } @@ -134,7 +158,7 @@ public class CounterApplicationIT @Bean KeyValueBytesStoreSupplier inMemoryStoreSupplier() { - return Stores.inMemoryKeyValueStore("TEST-STORE"); + return Stores.inMemoryKeyValueStore(STORE_NAME); } } }