X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;h=0898e88f0f37ac46d1711f76205cc969ea7ce573;hb=96c727206f954f313976e9e5947a56c888b5f990;hp=dcb123485fd17e35aaa77e1a2b4d8a8aa584fd44;hpb=913c2c4ec0a3584f6be27c8341898d17c4260501;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index dcb1234..0898e88 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -60,13 +60,15 @@ public class QueryStreamProcessor KTable users = builder .stream( usersInputTopic, - Consumed.with(null, new JsonSerde().copyWithType(User.class))) + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) .toTable( Materialized .as(userStoreSupplier) .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder.stream(rankingInputTopic); + KStream rankings = builder + .stream(rankingInputTopic) + .map((key, value) -> new KeyValue<>(key.getUser(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( @@ -76,6 +78,7 @@ public class QueryStreamProcessor .toTable( Materialized .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build();