popular: 1.0.0 - Added ``peek()``-operations
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessor.java
index 9ba9ad2..788ae20 100644 (file)
@@ -62,7 +62,9 @@ public class PopularStreamProcessor
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+                               .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
                                .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
+                               .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
                                .groupByKey()
                                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
                                .count(
@@ -71,18 +73,21 @@ public class PopularStreamProcessor
                                                                .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
                                                                .withValueSerde(Serdes.Long()))
                                .toStream()
+                               .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
                                .map((windowedWord, counter) -> new KeyValue<>(
                                                WindowedWord.of(
                                                                ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
                                                                ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
                                                                windowedWord.key().getWord()),
                                                WordCounter.of(windowedWord.key().getWord(), counter)))
+                               .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
                                .toTable(
                                                Materialized
                                                                .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
                                                                .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
                                                                .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
                                .toStream()
+                               .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
 
                Topology topology = builder.build();