query:1.0.1 - Rankings are enriched with user-data
authorKai Moritz <kai@juplo.de>
Sat, 4 Sep 2021 10:23:57 +0000 (12:23 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Oct 2021 18:47:29 +0000 (20:47 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/query/QueryController.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/query/User.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index d641d39..2dd080b 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>query</artifactId>
-       <version>1.0.0</version>
+       <version>1.0.1</version>
        <name>Wordcount-Query</name>
        <description>Query stream-processor of the multi-user wordcount-example</description>
        <properties>
index 14006b6..0c7dc31 100644 (file)
@@ -18,7 +18,7 @@ public class QueryController
   private final QueryStreamProcessor processor;
 
   @GetMapping("{username}")
-  ResponseEntity<Ranking> queryFor(@PathVariable String username)
+  ResponseEntity<UserRanking> queryFor(@PathVariable String username)
   {
     Optional<URI> redirect = processor.getRedirect(username);
     if (redirect.isPresent())
@@ -30,6 +30,6 @@ public class QueryController
               .build();
     }
 
-    return ResponseEntity.of(processor.getRanking(username));
+    return ResponseEntity.of(processor.getUserRanking(username));
   }
 }
index 319861d..f7dc750 100644 (file)
@@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -42,7 +44,29 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               builder.table(properties.getRankingInputTopic(), Materialized.as(storeName));
+               KTable<String, String> users = builder.table(properties.getUsersInputTopic());
+               KStream<String, String> rankings = builder.stream(properties.getRankingInputTopic());
+
+               rankings
+                               .join(users, (rankingJson, userJson) ->
+                               {
+                                       try
+                                       {
+                                               Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
+                                               User user = mapper.readValue(userJson, User.class);
+
+                                               return mapper.writeValueAsString(
+                                                               UserRanking.of(
+                                                                               user.getFirstName(),
+                                                                               user.getLastName(),
+                                                                               ranking.getEntries()));
+                                       }
+                                       catch (JsonProcessingException e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               })
+                               .toTable(Materialized.as(storeName));
 
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
@@ -85,7 +109,7 @@ public class QueryStreamProcessor
                return Optional.of(location);
        }
 
-       public Optional<Ranking> getRanking(String username)
+       public Optional<UserRanking> getUserRanking(String username)
        {
                return
                                Optional
@@ -94,7 +118,7 @@ public class QueryStreamProcessor
                                                {
                                                        try
                                                        {
-                                                               return mapper.readValue(json, Ranking.class);
+                                                               return mapper.readValue(json, UserRanking.class);
                                                        }
                                                        catch (JsonProcessingException e)
                                                        {
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java
new file mode 100644 (file)
index 0000000..fdc0a33
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(of = "username")
+public class User
+{
+  public enum Sex { FEMALE, MALE, OTHER }
+
+  private String username;
+
+  private String firstName;
+  private String lastName;
+  private Sex sex;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java
new file mode 100644 (file)
index 0000000..acffd5d
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+
+@Getter
+@Setter
+@AllArgsConstructor(staticName = "of")
+public class UserRanking
+{
+  private String firstName;
+  private String lastName;
+  private Entry[] top10;
+}