X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;h=e075eb756b2febf506543f4752585fce75c42770;hb=b52ff02b6952dfc708c7c7d94205b5b24afd68f9;hp=3a1665fa9f6245c867e7703efdd957874e3d0343;hpb=035b1f29fc38866e699452aeabda9675cef24c82;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 3a1665f..e075eb7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -65,7 +65,7 @@ public class QueryStreamProcessor .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder .stream(rankingInputTopic) - .map((key, value) -> new KeyValue<>(key.getUsername(), value)); + .map((key, value) -> new KeyValue<>(key.getUser(), value)); rankings .join(users, (ranking, user) -> UserRanking.of(