From: Kai Moritz Date: Thu, 30 May 2024 09:01:27 +0000 (+0200) Subject: TEST-STORE:GREEN X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=refs%2Fheads%2Ftop10;p=demos%2Fkafka%2Fwordcount TEST-STORE:GREEN --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index d588de2..03497e4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -3,6 +3,8 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -55,12 +57,14 @@ public class Top10ApplicationConfiguration public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - streamProcessorProperties); + streamProcessorProperties, + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -75,4 +79,10 @@ public class Top10ApplicationConfiguration return streamProcessor; } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("top10"); + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 2ff078c..343ab4d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,11 +1,10 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -20,12 +19,13 @@ public class Top10StreamProcessor public Top10StreamProcessor( String inputTopic, String outputTopic, - Properties props) + Properties props, + KeyValueBytesStoreSupplier storeSupplier) { Topology topology = Top10StreamProcessor.buildTopology( inputTopic, outputTopic, - null); + storeSupplier); streams = new KafkaStreams(topology, props); } @@ -43,7 +43,8 @@ public class Top10StreamProcessor .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry)) + (user, entry, ranking) -> ranking.add(entry), + Materialized.as(storeSupplier)) .toStream() .to(outputTopic); @@ -55,7 +56,7 @@ public class Top10StreamProcessor ReadOnlyKeyValueStore getStore(String name) { - return null; + return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 9f74e25..6707acc 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -6,9 +6,7 @@ import de.juplo.kafka.wordcount.query.RankingData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; @@ -100,6 +98,7 @@ public class Top10ApplicationIT @DisplayName("Await the expected output messages") @Test + @Disabled public void testAwaitExpectedMessages() { await("Expexted messages")