From 4fa8ff7e784017283d1fa2643dfc549febae8263 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 8 Jul 2024 19:58:01 +0200 Subject: [PATCH] Removed `peek()` --- .../kafka/wordcount/counter/AggregationTopologyTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 58a858a..85fb265 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,7 +3,10 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -33,12 +36,10 @@ public class AggregationTopologyTest KStream input = builder.stream(INPUT); input - .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) .groupByKey() .windowedBy(WINDOWS) .reduce((aggregate, value) -> aggregate + "-" + value) .toStream((k,v) -> k.toString()) - .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) .to(OUTPUT); Topology topology = builder.build(); -- 2.20.1