From 51280a23fb1d5bb04f1f071eb4720a3e8ec60e50 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Feb 2023 15:26:42 +0100 Subject: [PATCH] counter: Fixed a bug in the integration-test `CounterApplicationIT` - The default store-type creates state, that is stored on disk. - Hence, only the first run of the test succseeded. - The bug was fixed by providing an in-memory store-type. --- .../kafka/wordcount/counter/CounterApplication.java | 10 ++++++++++ .../wordcount/counter/CounterStreamProcessor.java | 5 ++++- .../kafka/wordcount/counter/CounterApplicationIT.java | 10 ++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 5fc183c..20cb4d2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -25,6 +27,7 @@ public class CounterApplication @Bean(initMethod = "start", destroyMethod = "stop") public CounterStreamProcessor streamProcessor( CounterApplicationProperties properties, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper objectMapper, ConfigurableApplicationContext context) { @@ -45,6 +48,7 @@ public class CounterApplication properties.getInputTopic(), properties.getOutputTopic(), propertyMap, + storeSupplier, objectMapper); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> @@ -62,6 +66,12 @@ public class CounterApplication return streamProcessor; } + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("counter"); + } + public static void main(String[] args) { SpringApplication.run(CounterApplication.class, args); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index a823e25..b19502d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -7,6 +7,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import java.util.Properties; @@ -21,6 +23,7 @@ public class CounterStreamProcessor String inputTopic, String outputTopic, Properties properties, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -40,7 +43,7 @@ public class CounterStreamProcessor } }) .groupByKey() - .count() + .count(Materialized.as(storeSupplier)) .mapValues(value->Long.toString(value)) .toStream() .to(outputTopic); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index f15404e..89fadf5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -6,12 +6,15 @@ import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -147,5 +150,12 @@ public class CounterApplicationIT { return new Consumer(mapper); } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore("TEST-STORE"); + } } } -- 2.20.1