query: 1.0.6 - Refined `QueryStreamProcessor`
authorKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 08:52:25 +0000 (10:52 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 19:25:16 +0000 (21:25 +0200)
--
store name is a constant

src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java

index 2d3d4c8..15b28db 100644 (file)
@@ -25,9 +25,10 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
+       public static final String STORE_NAME = "rankings-by-username";
+
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
-       public final String storeName = "rankingsByUsername";
        public final StoreQueryParameters<ReadOnlyKeyValueStore<String, String>> storeParameters;
        public final ObjectMapper mapper;
 
@@ -63,17 +64,17 @@ public class QueryStreamProcessor
                                                throw new RuntimeException(e);
                                        }
                                })
-                               .toTable(Materialized.as(storeName));
+                               .toTable(Materialized.as(STORE_NAME));
 
                streams = new KafkaStreams(builder.build(), props);
                hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
+               storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
                this.mapper = mapper;
        }
 
        public Optional<URI> getRedirect(String username)
        {
-               KeyQueryMetadata metadata = streams.queryMetadataForKey(storeName, username, Serdes.String().serializer());
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
                HostInfo activeHost = metadata.activeHost();
                log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());