From: Kai Moritz <kai@juplo.de>
Date: Sat, 11 Feb 2023 14:26:42 +0000 (+0100)
Subject: counter: 1.1.5 - Fixed a bug in the integration-test `CounterApplicationIT`
X-Git-Tag: counter-1.1.5
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=f8f9b6397ba0096bffa463e09a2db93277a3f9cf;p=demos%2Fkafka%2Fwordcount

counter: 1.1.5 - Fixed a bug in the integration-test `CounterApplicationIT`

- The default store-type creates state, that is stored on disk.
- Hence, only the first run of the test succseeded.
- The bug was fixed by providing an in-memory store-type.
---

diff --git a/pom.xml b/pom.xml
index de566c5..9231396 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
 	</parent>
 	<groupId>de.juplo.kafka.wordcount</groupId>
 	<artifactId>counter</artifactId>
-	<version>1.1.4</version>
+	<version>1.1.5</version>
 	<name>Wordcount-Counter</name>
 	<description>Word-counting stream-processor of the multi-user wordcount-example</description>
 	<properties>
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
index 5fc183c..20cb4d2 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
@@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,6 +27,7 @@ public class CounterApplication
 	@Bean(initMethod = "start", destroyMethod = "stop")
 	public CounterStreamProcessor streamProcessor(
 			CounterApplicationProperties properties,
+			KeyValueBytesStoreSupplier storeSupplier,
 			ObjectMapper objectMapper,
 			ConfigurableApplicationContext context)
 	{
@@ -45,6 +48,7 @@ public class CounterApplication
 				properties.getInputTopic(),
 				properties.getOutputTopic(),
 				propertyMap,
+				storeSupplier,
 				objectMapper);
 
 		streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
@@ -62,6 +66,12 @@ public class CounterApplication
 		return streamProcessor;
 	}
 
+	@Bean
+	public KeyValueBytesStoreSupplier storeSupplier()
+	{
+		return Stores.persistentKeyValueStore("counter");
+	}
+
 	public static void main(String[] args)
 	{
 		SpringApplication.run(CounterApplication.class, args);
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
index a823e25..b19502d 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
@@ -7,6 +7,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 
 import java.util.Properties;
 
@@ -21,6 +23,7 @@ public class CounterStreamProcessor
 			String inputTopic,
 			String outputTopic,
 			Properties properties,
+			KeyValueBytesStoreSupplier storeSupplier,
 			ObjectMapper mapper)
 	{
 		StreamsBuilder builder = new StreamsBuilder();
@@ -40,7 +43,7 @@ public class CounterStreamProcessor
 					}
 				})
 				.groupByKey()
-				.count()
+				.count(Materialized.as(storeSupplier))
 				.mapValues(value->Long.toString(value))
 				.toStream()
 				.to(outputTopic);
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
index 0d09a76..fd92000 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
@@ -6,12 +6,15 @@ import lombok.RequiredArgsConstructor;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -147,5 +150,12 @@ public class CounterApplicationIT
 		{
 			return new Consumer(mapper);
 		}
+
+		@Primary
+		@Bean
+		KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+		{
+			return Stores.inMemoryKeyValueStore("TEST-STORE");
+		}
 	}
 }