counter: 1.1.5 - Fixed a bug in the integration-test `CounterApplicationIT` counter-1.1.5
authorKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 14:26:42 +0000 (15:26 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 17 Feb 2023 16:31:30 +0000 (17:31 +0100)
- The default store-type creates state, that is stored on disk.
- Hence, only the first run of the test succseeded.
- The bug was fixed by providing an in-memory store-type.

pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java

diff --git a/pom.xml b/pom.xml
index de566c5..9231396 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.1.4</version>
+       <version>1.1.5</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 5fc183c..20cb4d2 100644 (file)
@@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,6 +27,7 @@ public class CounterApplication
        @Bean(initMethod = "start", destroyMethod = "stop")
        public CounterStreamProcessor streamProcessor(
                        CounterApplicationProperties properties,
+                       KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper objectMapper,
                        ConfigurableApplicationContext context)
        {
@@ -45,6 +48,7 @@ public class CounterApplication
                                properties.getInputTopic(),
                                properties.getOutputTopic(),
                                propertyMap,
+                               storeSupplier,
                                objectMapper);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
@@ -62,6 +66,12 @@ public class CounterApplication
                return streamProcessor;
        }
 
+       @Bean
+       public KeyValueBytesStoreSupplier storeSupplier()
+       {
+               return Stores.persistentKeyValueStore("counter");
+       }
+
        public static void main(String[] args)
        {
                SpringApplication.run(CounterApplication.class, args);
index a823e25..b19502d 100644 (file)
@@ -7,6 +7,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 
 import java.util.Properties;
 
@@ -21,6 +23,7 @@ public class CounterStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
+                       KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
@@ -40,7 +43,7 @@ public class CounterStreamProcessor
                                        }
                                })
                                .groupByKey()
-                               .count()
+                               .count(Materialized.as(storeSupplier))
                                .mapValues(value->Long.toString(value))
                                .toStream()
                                .to(outputTopic);
index 0d09a76..fd92000 100644 (file)
@@ -6,12 +6,15 @@ import lombok.RequiredArgsConstructor;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -147,5 +150,12 @@ public class CounterApplicationIT
                {
                        return new Consumer(mapper);
                }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore("TEST-STORE");
+               }
        }
 }