From: Kai Moritz <kai@juplo.de>
Date: Wed, 12 Jun 2024 20:46:24 +0000 (+0200)
Subject: query: 2.0.0 - Configured caching & commit-interval in integration-test
X-Git-Tag: query-with-kafkaproducer~3
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=f18423d411650c6f08c9b698b92c33c42bdd670f;p=demos%2Fkafka%2Fwordcount

query: 2.0.0 - Configured caching & commit-interval in integration-test

* Introduced configuration-parameters for caching and the commit-interval.
* Explicitly turned of caching in the integration-test.
* Explicitly set the commit-interval to a very short period (100ms) in the
  integration-test.
---

diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
index 3bf8326..2ece744 100644
--- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
@@ -70,6 +70,13 @@ public class QueryApplicationConfiguration
 		props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
 		props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
 		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+
+		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
+		if (applicationProperties.getCommitInterval() != null)
+			props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval());
+		if (applicationProperties.getCacheMaxBytes() != null)
+			props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes());
+
 		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
 		return props;
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java
index df5f41e..4a9eeca 100644
--- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java
+++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java
@@ -17,4 +17,6 @@ public class QueryApplicationProperties
   private String applicationId = "query";
   private String rankingInputTopic = "top10";
   private String usersInputTopic = "users";
+  private Integer commitInterval;
+  private Integer cacheMaxBytes;
 }
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
index d800fbd..58a1206 100644
--- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
+++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
@@ -40,6 +40,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 				"logging.level.org.apache.kafka.clients=INFO",
 				"logging.level.org.apache.kafka.streams=INFO",
 				"juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
+				"juplo.wordcount.query.commit-interval=100",
+				"juplo.wordcount.query.cache-max-bytes=0",
 				"juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
 				"juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
 @AutoConfigureMockMvc