X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;h=f7dc75032da58301ebb675c7891c93cf37dbaf46;hb=101100b6dba432bb61952a1ddb2dac46d04f0750;hp=319861da3d3e6c0ad9cc491c4fd13d1d92d37c56;hpb=3bfe34e8bc0539932cf93361fe6f710738b37897;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 319861d..f7dc750 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -42,7 +44,29 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - builder.table(properties.getRankingInputTopic(), Materialized.as(storeName)); + KTable users = builder.table(properties.getUsersInputTopic()); + KStream rankings = builder.stream(properties.getRankingInputTopic()); + + rankings + .join(users, (rankingJson, userJson) -> + { + try + { + Ranking ranking = mapper.readValue(rankingJson, Ranking.class); + User user = mapper.readValue(userJson, User.class); + + return mapper.writeValueAsString( + UserRanking.of( + user.getFirstName(), + user.getLastName(), + ranking.getEntries())); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }) + .toTable(Materialized.as(storeName)); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); @@ -85,7 +109,7 @@ public class QueryStreamProcessor return Optional.of(location); } - public Optional getRanking(String username) + public Optional getUserRanking(String username) { return Optional @@ -94,7 +118,7 @@ public class QueryStreamProcessor { try { - return mapper.readValue(json, Ranking.class); + return mapper.readValue(json, UserRanking.class); } catch (JsonProcessingException e) {