From 3b7fae76b8abb62a8cae3a4a32c880b29bce0574 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 13:20:52 +0200 Subject: [PATCH] top10: 1.2.1 - The name of the state-store is an internal detail --- .../wordcount/top10/Top10ApplicationConfiguration.java | 3 ++- .../juplo/kafka/wordcount/top10/Top10StreamProcessor.java | 6 ++++-- .../de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index bb6fef7..bd5298d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -94,6 +95,6 @@ public class Top10ApplicationConfiguration @Bean public KeyValueBytesStoreSupplier storeSupplier() { - return Stores.persistentKeyValueStore("top10"); + return Stores.persistentKeyValueStore(STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 343ab4d..70ead87 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -13,6 +13,8 @@ import java.util.Properties; @Slf4j public class Top10StreamProcessor { + public static final String STORE_NAME= "top10"; + public final KafkaStreams streams; @@ -54,9 +56,9 @@ public class Top10StreamProcessor return topology; } - ReadOnlyKeyValueStore getStore(String name) + ReadOnlyKeyValueStore getStore() { - return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 1097310..88d03ba 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -25,6 +25,7 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -55,7 +56,6 @@ public class Top10ApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; - public static final String STORE_NAME = "TEST-STORE"; @Autowired Consumer consumer; @@ -94,7 +94,7 @@ public class Top10ApplicationIT { await("Expected state") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME))); + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); } @DisplayName("Await the expected output messages") -- 2.20.1