From b69e1270a308f200b2640a01d37d4636a0a549e1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 30 May 2024 11:50:56 +0200 Subject: [PATCH] top10: 1.2.1 - (RED) Added an assertion regarding the expected state --- .../wordcount/top10/Top10StreamProcessor.java | 13 +++++++++-- .../juplo/kafka/wordcount/top10/TestData.java | 7 ++++++ .../wordcount/top10/Top10ApplicationIT.java | 22 +++++++++++++++++++ .../Top10StreamProcessorTopologyTest.java | 15 ++++++++++--- 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index d3846d8..2ff078c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -5,6 +5,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -22,14 +24,16 @@ public class Top10StreamProcessor { Topology topology = Top10StreamProcessor.buildTopology( inputTopic, - outputTopic); + outputTopic, + null); streams = new KafkaStreams(topology, props); } static Topology buildTopology( String inputTopic, - String outputTopic) + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); @@ -49,6 +53,11 @@ public class Top10StreamProcessor return topology; } + ReadOnlyKeyValueStore getStore(String name) + { + return null; + } + public void start() { log.info("Starting Stream-Processor"); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 3bb6b54..f6d7ccd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -5,6 +5,7 @@ import de.juplo.kafka.wordcount.counter.TestWord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -63,6 +64,12 @@ class TestData .containsExactlyElementsOf(rankings)); } + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value); + assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value); + } + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index b35dd3d..726b1e7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -4,6 +4,8 @@ import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.query.TestRanking; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -11,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; @@ -54,9 +57,12 @@ public class Top10ApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; + public static final String STORE_NAME = "TEST-STORE"; @Autowired Consumer consumer; + @Autowired + Top10StreamProcessor streamProcessor; @BeforeAll @@ -84,6 +90,15 @@ public class Top10ApplicationIT }); } + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME))); + } + @DisplayName("Await the expected output messages") @Test public void testAwaitExpectedMessages() @@ -121,5 +136,12 @@ public class Top10ApplicationIT { return new Consumer(); } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 01c1cf6..1becd65 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -5,6 +5,8 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,8 +29,9 @@ import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.K @Slf4j public class Top10StreamProcessorTopologyTest { - public final static String IN = "TEST-IN"; - public final static String OUT = "TEST-OUT"; + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -39,7 +42,10 @@ public class Top10StreamProcessorTopologyTest @BeforeEach public void setUp() { - Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); + Topology topology = Top10StreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); Top10ApplicationConfiguration applicationConfiguriation = new Top10ApplicationConfiguration(); @@ -91,6 +97,9 @@ public class Top10StreamProcessorTopologyTest }); TestData.assertExpectedMessages(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); } @AfterEach -- 2.20.1