query:1.0.1 - Rankings are enriched with user-data
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index 319861d..f7dc750 100644 (file)
@@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -42,7 +44,29 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               builder.table(properties.getRankingInputTopic(), Materialized.as(storeName));
+               KTable<String, String> users = builder.table(properties.getUsersInputTopic());
+               KStream<String, String> rankings = builder.stream(properties.getRankingInputTopic());
+
+               rankings
+                               .join(users, (rankingJson, userJson) ->
+                               {
+                                       try
+                                       {
+                                               Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
+                                               User user = mapper.readValue(userJson, User.class);
+
+                                               return mapper.writeValueAsString(
+                                                               UserRanking.of(
+                                                                               user.getFirstName(),
+                                                                               user.getLastName(),
+                                                                               ranking.getEntries()));
+                                       }
+                                       catch (JsonProcessingException e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               })
+                               .toTable(Materialized.as(storeName));
 
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
@@ -85,7 +109,7 @@ public class QueryStreamProcessor
                return Optional.of(location);
        }
 
-       public Optional<Ranking> getRanking(String username)
+       public Optional<UserRanking> getUserRanking(String username)
        {
                return
                                Optional
@@ -94,7 +118,7 @@ public class QueryStreamProcessor
                                                {
                                                        try
                                                        {
-                                                               return mapper.readValue(json, Ranking.class);
+                                                               return mapper.readValue(json, UserRanking.class);
                                                        }
                                                        catch (JsonProcessingException e)
                                                        {