From 51280a23fb1d5bb04f1f071eb4720a3e8ec60e50 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sat, 11 Feb 2023 15:26:42 +0100
Subject: [PATCH] counter: Fixed a bug in the integration-test
 `CounterApplicationIT`

- The default store-type creates state, that is stored on disk.
- Hence, only the first run of the test succseeded.
- The bug was fixed by providing an in-memory store-type.
---
 .../kafka/wordcount/counter/CounterApplication.java    | 10 ++++++++++
 .../wordcount/counter/CounterStreamProcessor.java      |  5 ++++-
 .../kafka/wordcount/counter/CounterApplicationIT.java  | 10 ++++++++++
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
index 5fc183c..20cb4d2 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
@@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,6 +27,7 @@ public class CounterApplication
 	@Bean(initMethod = "start", destroyMethod = "stop")
 	public CounterStreamProcessor streamProcessor(
 			CounterApplicationProperties properties,
+			KeyValueBytesStoreSupplier storeSupplier,
 			ObjectMapper objectMapper,
 			ConfigurableApplicationContext context)
 	{
@@ -45,6 +48,7 @@ public class CounterApplication
 				properties.getInputTopic(),
 				properties.getOutputTopic(),
 				propertyMap,
+				storeSupplier,
 				objectMapper);
 
 		streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
@@ -62,6 +66,12 @@ public class CounterApplication
 		return streamProcessor;
 	}
 
+	@Bean
+	public KeyValueBytesStoreSupplier storeSupplier()
+	{
+		return Stores.persistentKeyValueStore("counter");
+	}
+
 	public static void main(String[] args)
 	{
 		SpringApplication.run(CounterApplication.class, args);
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
index a823e25..b19502d 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
@@ -7,6 +7,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 
 import java.util.Properties;
 
@@ -21,6 +23,7 @@ public class CounterStreamProcessor
 			String inputTopic,
 			String outputTopic,
 			Properties properties,
+			KeyValueBytesStoreSupplier storeSupplier,
 			ObjectMapper mapper)
 	{
 		StreamsBuilder builder = new StreamsBuilder();
@@ -40,7 +43,7 @@ public class CounterStreamProcessor
 					}
 				})
 				.groupByKey()
-				.count()
+				.count(Materialized.as(storeSupplier))
 				.mapValues(value->Long.toString(value))
 				.toStream()
 				.to(outputTopic);
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
index f15404e..89fadf5 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
@@ -6,12 +6,15 @@ import lombok.RequiredArgsConstructor;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -147,5 +150,12 @@ public class CounterApplicationIT
 		{
 			return new Consumer(mapper);
 		}
+
+		@Primary
+		@Bean
+		KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+		{
+			return Stores.inMemoryKeyValueStore("TEST-STORE");
+		}
 	}
 }
-- 
2.20.1