top10: 1.2.1 - (RED) Added an assertion regarding the expected state
authorKai Moritz <kai@juplo.de>
Thu, 30 May 2024 09:50:56 +0000 (11:50 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:15:02 +0000 (13:15 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

index d3846d8..2ff078c 100644 (file)
@@ -5,6 +5,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.Properties;
 
@@ -22,14 +24,16 @@ public class Top10StreamProcessor
        {
                Topology topology = Top10StreamProcessor.buildTopology(
                                inputTopic,
-                               outputTopic);
+                               outputTopic,
+                               null);
 
                streams = new KafkaStreams(topology, props);
        }
 
        static Topology buildTopology(
                        String inputTopic,
-                       String outputTopic)
+                       String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -49,6 +53,11 @@ public class Top10StreamProcessor
                return topology;
        }
 
+       ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+       {
+               return null;
+       }
+
        public void start()
        {
                log.info("Starting Stream-Processor");
index 3bb6b54..f6d7ccd 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.wordcount.counter.TestWord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -63,6 +64,12 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
+       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       {
+               assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
+               assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+       }
+
        static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
index b35dd3d..726b1e7 100644 (file)
@@ -4,6 +4,8 @@ import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.query.TestRanking;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -11,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.KafkaHeaders;
@@ -54,9 +57,12 @@ public class Top10ApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
+       public static final String STORE_NAME = "TEST-STORE";
 
        @Autowired
        Consumer consumer;
+       @Autowired
+       Top10StreamProcessor streamProcessor;
 
 
        @BeforeAll
@@ -84,6 +90,15 @@ public class Top10ApplicationIT
                                });
        }
 
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+       }
+
        @DisplayName("Await the expected output messages")
        @Test
        public void testAwaitExpectedMessages()
@@ -121,5 +136,12 @@ public class Top10ApplicationIT
                {
                        return new Consumer();
                }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+               }
        }
 }
index 01c1cf6..1becd65 100644 (file)
@@ -5,6 +5,8 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -27,8 +29,9 @@ import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.K
 @Slf4j
 public class Top10StreamProcessorTopologyTest
 {
-  public final static String IN = "TEST-IN";
-  public final static String OUT = "TEST-OUT";
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
 
 
   TopologyTestDriver testDriver;
@@ -39,7 +42,10 @@ public class Top10StreamProcessorTopologyTest
   @BeforeEach
   public void setUp()
   {
-    Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
+    Topology topology = Top10StreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore(STORE_NAME));
 
     Top10ApplicationConfiguration applicationConfiguriation =
         new Top10ApplicationConfiguration();
@@ -91,6 +97,9 @@ public class Top10StreamProcessorTopologyTest
         });
 
     TestData.assertExpectedMessages(receivedMessages);
+
+    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
   }
 
   @AfterEach