X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10ApplicationIT.java;h=1097310ccde0fcc2d69e793c0bbdc8aba9e4a137;hb=fa19058953b388f893255d2bcc9351ac9eb3328b;hp=b35dd3d982db0b3dd9c5628e17f26fd5f01230d6;hpb=df0c22234e9ace115b4e4abc4e3d881d5595668e;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index b35dd3d..1097310 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -4,13 +4,14 @@ import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.query.TestRanking; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; @@ -54,9 +55,12 @@ public class Top10ApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; + public static final String STORE_NAME = "TEST-STORE"; @Autowired Consumer consumer; + @Autowired + Top10StreamProcessor streamProcessor; @BeforeAll @@ -84,8 +88,18 @@ public class Top10ApplicationIT }); } + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME))); + } + @DisplayName("Await the expected output messages") @Test + @Disabled public void testAwaitExpectedMessages() { await("Expected messages") @@ -93,6 +107,24 @@ public class Top10ApplicationIT .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); } + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedNumberOfMessagesForUsers(consumer.getReceivedMessages())); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages())); + } + static class Consumer { @@ -121,5 +153,12 @@ public class Top10ApplicationIT { return new Consumer(); } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } } }