top10: 1.2.1 - The name of the state-store is an internal detail
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10ApplicationIT.java
index 726b1e7..88d03ba 100644 (file)
@@ -6,9 +6,7 @@ import de.juplo.kafka.wordcount.query.TestRanking;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
@@ -27,6 +25,7 @@ import org.springframework.util.MultiValueMap;
 import java.time.Duration;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -57,7 +56,6 @@ public class Top10ApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
-       public static final String STORE_NAME = "TEST-STORE";
 
        @Autowired
        Consumer consumer;
@@ -96,11 +94,12 @@ public class Top10ApplicationIT
        {
                await("Expected state")
                                .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
        }
 
        @DisplayName("Await the expected output messages")
        @Test
+       @Disabled
        public void testAwaitExpectedMessages()
        {
                await("Expected messages")
@@ -108,6 +107,24 @@ public class Top10ApplicationIT
                                .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
        }
 
+       @DisplayName("Await the expected number of messages")
+       @Test
+       public void testAwaitExpectedNumberOfMessagesForUsers()
+       {
+               await("Expected number of messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedNumberOfMessagesForUsers(consumer.getReceivedMessages()));
+       }
+
+       @DisplayName("Await the expected final output messages")
+       @Test
+       public void testAwaitExpectedLastMessagesForUsers()
+       {
+               await("Expected final output messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages()));
+       }
+
 
        static class Consumer
        {