top10: 1.2.1 - The name of the state-store is an internal detail
authorKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:20:52 +0000 (13:20 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:20:52 +0000 (13:20 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java

index bb6fef7..bd5298d 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -94,6 +95,6 @@ public class Top10ApplicationConfiguration
        @Bean
        public KeyValueBytesStoreSupplier storeSupplier()
        {
-               return Stores.persistentKeyValueStore("top10");
+               return Stores.persistentKeyValueStore(STORE_NAME);
        }
 }
index 343ab4d..70ead87 100644 (file)
@@ -13,6 +13,8 @@ import java.util.Properties;
 @Slf4j
 public class Top10StreamProcessor
 {
+       public static final String STORE_NAME= "top10";
+
        public final KafkaStreams streams;
 
 
@@ -54,9 +56,9 @@ public class Top10StreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+       ReadOnlyKeyValueStore<User, Ranking> getStore()
        {
-               return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
+               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
 
        public void start()
index 1097310..88d03ba 100644 (file)
@@ -25,6 +25,7 @@ import org.springframework.util.MultiValueMap;
 import java.time.Duration;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -55,7 +56,6 @@ public class Top10ApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
-       public static final String STORE_NAME = "TEST-STORE";
 
        @Autowired
        Consumer consumer;
@@ -94,7 +94,7 @@ public class Top10ApplicationIT
        {
                await("Expected state")
                                .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
        }
 
        @DisplayName("Await the expected output messages")