counter: 1.2.2. - Simplified topology - better readability
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
index 324e424..4cc0c68 100644 (file)
@@ -61,16 +61,9 @@ public class CounterStreamProcessor
                                                                .noTypeInfo(),
                                                new JsonSerde<>(Word.class)
                                                                .noTypeInfo()))
-                               .count(Materialized
-                                               .<Word,Long>as(storeSupplier)
-                                               .withKeySerde(
-                                                               new JsonSerde<>(Word.class)
-                                                                               .forKeys()
-                                                                               .noTypeInfo())
-                                               .withValueSerde(
-                                                               Serdes.Long()))
+                               .count(Materialized.as(storeSupplier))
                                .toStream()
-                               .map((word, count) -> new KeyValue<>(word, WordCount.of(word.getUser(), word.getWord(), count)))
+                               .map((word, count) -> new KeyValue<>(word, WordCount.of(word, count)))
                                .to(
                                                outputTopic,
                                                Produced.with(