popular: 1.0.0 - Added ``peek()``-operations popularl-1.0.0
authorKai Moritz <kai@juplo.de>
Mon, 17 Jun 2024 18:43:04 +0000 (20:43 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 19 Jun 2024 21:02:00 +0000 (23:02 +0200)
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java

index 9ba9ad2..788ae20 100644 (file)
@@ -62,7 +62,9 @@ public class PopularStreamProcessor
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+                               .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
                                .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
+                               .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
                                .groupByKey()
                                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
                                .count(
@@ -71,18 +73,21 @@ public class PopularStreamProcessor
                                                                .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
                                                                .withValueSerde(Serdes.Long()))
                                .toStream()
+                               .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
                                .map((windowedWord, counter) -> new KeyValue<>(
                                                WindowedWord.of(
                                                                ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
                                                                ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
                                                                windowedWord.key().getWord()),
                                                WordCounter.of(windowedWord.key().getWord(), counter)))
+                               .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
                                .toTable(
                                                Materialized
                                                                .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
                                                                .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
                                                                .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
                                .toStream()
+                               .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
 
                Topology topology = builder.build();