From: Kai Moritz Date: Mon, 8 Jul 2024 17:58:01 +0000 (+0200) Subject: Removed `peek()` X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4fa8ff7e784017283d1fa2643dfc549febae8263;p=demos%2Fkafka%2Fwordcount Removed `peek()` --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 58a858a..85fb265 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,7 +3,10 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -33,12 +36,10 @@ public class AggregationTopologyTest KStream input = builder.stream(INPUT); input - .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) .groupByKey() .windowedBy(WINDOWS) .reduce((aggregate, value) -> aggregate + "-" + value) .toStream((k,v) -> k.toString()) - .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) .to(OUTPUT); Topology topology = builder.build();