WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
index 63357e8..e6deee0 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.top10;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -18,17 +17,16 @@ public class Top10StreamProcessor
        public Top10StreamProcessor(
                        String inputTopic,
                        String outputTopic,
-                       ObjectMapper mapper,
                        Properties props)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<Word, WordCount>stream(inputTopic)
-                               .map((word, wordCount) ->
+                               .<Key, Counter>stream(inputTopic)
+                               .map((key, counter) ->
                                {
-                                       Entry entry = Entry.of(word.getWord(), wordCount.getCount());
-                                       return new KeyValue<>(word.getUser(), entry);
+                                       Entry entry = Entry.of(key.getWord(), counter.getCounter());
+                                       return new KeyValue<>(key.getUser(), entry);
                                })
                                .groupByKey()
                                .aggregate(