From e88db15eafe64ac083d066be29ad94058930bb5d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 17 Jun 2024 20:43:04 +0200 Subject: [PATCH] popular: 1.0.0 - Added ``peek()``-operations --- .../kafka/wordcount/popular/PopularStreamProcessor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 9ba9ad2..788ae20 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -62,7 +62,9 @@ public class PopularStreamProcessor builder .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) + .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord)) .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord()))) + .peek((key, value) -> log.info("mapped: {} -> {}", key, value)) .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE)) .count( @@ -71,18 +73,21 @@ public class PopularStreamProcessor .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo()) .withValueSerde(Serdes.Long())) .toStream() + .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter)) .map((windowedWord, counter) -> new KeyValue<>( WindowedWord.of( ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone), ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone), windowedWord.key().getWord()), WordCounter.of(windowedWord.key().getWord(), counter))) + .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter)) .toTable( Materialized .as(keyValueBytesStoreSupplier) .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo()) .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo())) .toStream() + .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter)) .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); Topology topology = builder.build(); -- 2.20.1