query: 2.1.0 - Refined input JSON (adapted to general format for stats)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index e075eb7..5543a91 100644 (file)
@@ -20,6 +20,7 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
+       public static final String STATS_TYPE = "COUNTER";
        public static final String USER_STORE_NAME = "users";
        public static final String RANKING_STORE_NAME = "rankings";
 
@@ -65,7 +66,8 @@ public class QueryStreamProcessor
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder
                                .<Key, Ranking>stream(rankingInputTopic)
-                               .map((key, value) -> new KeyValue<>(key.getUser(), value));
+                               .filter((key, value) -> STATS_TYPE.equals(key.getType()))
+                               .map((key, value) -> new KeyValue<>(key.getChannel(), value));
 
                rankings
                                .join(users, (ranking, user) -> UserRanking.of(