From: Kai Moritz Date: Sat, 4 Sep 2021 10:23:57 +0000 (+0200) Subject: query:1.0.1 - Rankings are enriched with user-data X-Git-Tag: query-1.0.1 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=101100b6dba432bb61952a1ddb2dac46d04f0750 query:1.0.1 - Rankings are enriched with user-data --- diff --git a/pom.xml b/pom.xml index d641d39..2dd080b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 1.0.0 + 1.0.1 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java index 14006b6..0c7dc31 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java @@ -18,7 +18,7 @@ public class QueryController private final QueryStreamProcessor processor; @GetMapping("{username}") - ResponseEntity queryFor(@PathVariable String username) + ResponseEntity queryFor(@PathVariable String username) { Optional redirect = processor.getRedirect(username); if (redirect.isPresent()) @@ -30,6 +30,6 @@ public class QueryController .build(); } - return ResponseEntity.of(processor.getRanking(username)); + return ResponseEntity.of(processor.getUserRanking(username)); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 319861d..f7dc750 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -42,7 +44,29 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - builder.table(properties.getRankingInputTopic(), Materialized.as(storeName)); + KTable users = builder.table(properties.getUsersInputTopic()); + KStream rankings = builder.stream(properties.getRankingInputTopic()); + + rankings + .join(users, (rankingJson, userJson) -> + { + try + { + Ranking ranking = mapper.readValue(rankingJson, Ranking.class); + User user = mapper.readValue(userJson, User.class); + + return mapper.writeValueAsString( + UserRanking.of( + user.getFirstName(), + user.getLastName(), + ranking.getEntries())); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }) + .toTable(Materialized.as(storeName)); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); @@ -85,7 +109,7 @@ public class QueryStreamProcessor return Optional.of(location); } - public Optional getRanking(String username) + public Optional getUserRanking(String username) { return Optional @@ -94,7 +118,7 @@ public class QueryStreamProcessor { try { - return mapper.readValue(json, Ranking.class); + return mapper.readValue(json, UserRanking.class); } catch (JsonProcessingException e) { diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java new file mode 100644 index 0000000..fdc0a33 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/User.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@ToString +@EqualsAndHashCode(of = "username") +public class User +{ + public enum Sex { FEMALE, MALE, OTHER } + + private String username; + + private String firstName; + private String lastName; + private Sex sex; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java new file mode 100644 index 0000000..acffd5d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + + +@Getter +@Setter +@AllArgsConstructor(staticName = "of") +public class UserRanking +{ + private String firstName; + private String lastName; + private Entry[] top10; +}