top10: 1.2.1 - (RED) Added an assertion regarding the expected state
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10ApplicationIT.java
index b35dd3d..726b1e7 100644 (file)
@@ -4,6 +4,8 @@ import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.query.TestRanking;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -11,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.KafkaHeaders;
@@ -54,9 +57,12 @@ public class Top10ApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
+       public static final String STORE_NAME = "TEST-STORE";
 
        @Autowired
        Consumer consumer;
+       @Autowired
+       Top10StreamProcessor streamProcessor;
 
 
        @BeforeAll
@@ -84,6 +90,15 @@ public class Top10ApplicationIT
                                });
        }
 
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+       }
+
        @DisplayName("Await the expected output messages")
        @Test
        public void testAwaitExpectedMessages()
@@ -121,5 +136,12 @@ public class Top10ApplicationIT
                {
                        return new Consumer();
                }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+               }
        }
 }