From f18423d411650c6f08c9b698b92c33c42bdd670f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 22:46:24 +0200 Subject: [PATCH] query: 2.0.0 - Configured caching & commit-interval in integration-test * Introduced configuration-parameters for caching and the commit-interval. * Explicitly turned of caching in the integration-test. * Explicitly set the commit-interval to a very short period (100ms) in the integration-test. --- .../wordcount/query/QueryApplicationConfiguration.java | 7 +++++++ .../kafka/wordcount/query/QueryApplicationProperties.java | 2 ++ .../de/juplo/kafka/wordcount/query/QueryApplicationIT.java | 2 ++ 3 files changed, 11 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 3bf8326..2ece744 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -70,6 +70,13 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java index df5f41e..4a9eeca 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -17,4 +17,6 @@ public class QueryApplicationProperties private String applicationId = "query"; private String rankingInputTopic = "top10"; private String usersInputTopic = "users"; + private Integer commitInterval; + private Integer cacheMaxBytes; } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index d800fbd..58a1206 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -40,6 +40,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. "logging.level.org.apache.kafka.clients=INFO", "logging.level.org.apache.kafka.streams=INFO", "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) @AutoConfigureMockMvc -- 2.20.1