WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
index f0a7d19..63357e8 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.top10;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
@@ -25,37 +24,19 @@ public class Top10StreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<String, String>stream(inputTopic)
-                               .map((keyJson, countStr) ->
+                               .<Word, WordCount>stream(inputTopic)
+                               .map((word, wordCount) ->
                                {
-                                       try
-                                       {
-                                               Key key = mapper.readValue(keyJson, Key.class);
-                                               Long count = Long.parseLong(countStr);
-                                               Entry entry = Entry.of(key.getWord(), count);
-                                               String entryJson = mapper.writeValueAsString(entry);
-                                               return new KeyValue<>(key.getUsername(), entryJson);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
+                                       Entry entry = Entry.of(word.getWord(), wordCount.getCount());
+                                       return new KeyValue<>(word.getUser(), entry);
                                })
                                .groupByKey()
                                .aggregate(
-                                               () -> "{\"entries\"     : []}",
-                                               (username, entryJson, rankingJson) ->
+                                               () -> new Ranking(),
+                                               (user, entry, ranking) ->
                                                {
-                                                       try
-                                                       {
-                                                               Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
-                                                               ranking.add(mapper.readValue(entryJson, Entry.class));
-                                                               return mapper.writeValueAsString(ranking);
-                                                       }
-                                                       catch (JsonProcessingException e)
-                                                       {
-                                                               throw new RuntimeException(e);
-                                                       }
+                                                       ranking.add(entry);
+                                                       return ranking;
                                                }
                                )
                                .toStream()