From: Kai Moritz Date: Thu, 30 May 2024 09:01:27 +0000 (+0200) Subject: top10: 1.2.1 - (GREEN) Made the aggregation state accessible X-Git-Tag: top10-1.2.1~15 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9e0ee342b531e7cfcddfe2e34390807802725a14;p=demos%2Fkafka%2Fwordcount top10: 1.2.1 - (GREEN) Made the aggregation state accessible * The assertion, regarding the expected state, is true for both tests. * The assertion, regarding the expected messages, ix only true for the test that is based on the `TopologyTestDriver`. * _This highlights the need for an additional integration test to avoid misinterpretation of the results of tests, that are based on the ``TopologyTestDriver``. * _Note,_ that the assertion, regarding the expected state, works as well in the `Top10ApplicationIT`, if the output buffering is enabled, because the changed state becomes visible immediately. * The buffering ist still turned of, so that the disabled test can be run in IntelliJ to comprehend, that the assertion is actually false. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 000db01..6e1f93f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -3,6 +3,8 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -55,12 +57,14 @@ public class Top10ApplicationConfiguration public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - streamProcessorProperties); + streamProcessorProperties, + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -75,4 +79,10 @@ public class Top10ApplicationConfiguration return streamProcessor; } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("top10"); + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 2ff078c..343ab4d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,11 +1,10 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -20,12 +19,13 @@ public class Top10StreamProcessor public Top10StreamProcessor( String inputTopic, String outputTopic, - Properties props) + Properties props, + KeyValueBytesStoreSupplier storeSupplier) { Topology topology = Top10StreamProcessor.buildTopology( inputTopic, outputTopic, - null); + storeSupplier); streams = new KafkaStreams(topology, props); } @@ -43,7 +43,8 @@ public class Top10StreamProcessor .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry)) + (user, entry, ranking) -> ranking.add(entry), + Materialized.as(storeSupplier)) .toStream() .to(outputTopic); @@ -55,7 +56,7 @@ public class Top10StreamProcessor ReadOnlyKeyValueStore getStore(String name) { - return null; + return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 726b1e7..d55048d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -6,9 +6,7 @@ import de.juplo.kafka.wordcount.query.TestRanking; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; @@ -101,6 +99,7 @@ public class Top10ApplicationIT @DisplayName("Await the expected output messages") @Test + @Disabled public void testAwaitExpectedMessages() { await("Expected messages")