counter: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
index c983a25..455d895 100644 (file)
@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class CounterStreamProcessor
 {
+       public static final String TYPE = "COUNTER";
        public static final String STORE_NAME = "counter";
 
 
@@ -48,6 +49,7 @@ public class CounterStreamProcessor
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+                               .mapValues(word -> Word.of(word.getUser(), word.getWord()))
                                .map((key, word) -> new KeyValue<>(word, word))
                                .groupByKey()
                                .count(
@@ -88,9 +90,9 @@ public class CounterStreamProcessor
                return new JsonSerde<>(User.class);
        }
 
-       public static JsonSerde<Word> inValueSerde()
+       public static JsonSerde<UserWord> inValueSerde()
        {
-               return new JsonSerde<>(Word.class);
+               return new JsonSerde<>(UserWord.class);
        }
 
        public static JsonSerde<Word> outKeySerde()
@@ -114,17 +116,17 @@ public class CounterStreamProcessor
 
        private static String typeMappingsConfig()
        {
-               return typeMappings()
-                               .entrySet()
-                               .stream()
-                               .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
-                               .collect(Collectors.joining(","));
+               return typeMappingsConfig(Word.class, WordCounter.class);
        }
 
-       private static Map<String, Class> typeMappings()
+       public static String typeMappingsConfig(Class keyClass, Class counterClass)
        {
                return Map.of(
-                               "word", Word.class,
-                               "counter", WordCounter.class);
+                                               "key", keyClass,
+                                               "counter", counterClass)
+                               .entrySet()
+                               .stream()
+                               .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
+                               .collect(Collectors.joining(","));
        }
 }