X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessor.java;h=907c7ff89ad546a82d639d1f6a7ebc4c5d856f05;hb=5030aed19804a0c48f1968208e176657bdd147de;hp=343ab4d85e970199981798a89efea7714c70023c;hpb=9e0ee342b531e7cfcddfe2e34390807802725a14;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 343ab4d..907c7ff 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -13,6 +13,8 @@ import java.util.Properties; @Slf4j public class Top10StreamProcessor { + public static final String STORE_NAME= "top10"; + public final KafkaStreams streams; @@ -39,7 +41,7 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) + .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry)) .groupByKey() .aggregate( () -> new Ranking(), @@ -54,9 +56,9 @@ public class Top10StreamProcessor return topology; } - ReadOnlyKeyValueStore getStore(String name) + ReadOnlyKeyValueStore getStore() { - return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } public void start()