From: Kai Moritz Date: Mon, 8 Jul 2024 14:27:33 +0000 (+0200) Subject: Vorschlag:2-mehr-an-Ausgangslage-angepasst X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=811c741627383300fc4ae7e9d48e4acabaef6ee8;p=demos%2Fkafka%2Fwordcount Vorschlag:2-mehr-an-Ausgangslage-angepasst --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 90584c2..1b2b03d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -39,7 +39,9 @@ public class AggregationTopologyTest // Define sliding window of size 5 minutes with a grace period of 1 minute SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)); - KGroupedStream groupedStream = input.groupByKey(); + KGroupedStream groupedStream = input + .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) + .groupByKey(); KTable, Long> aggregatedTable = groupedStream .windowedBy(slidingWindow) @@ -47,16 +49,10 @@ public class AggregationTopologyTest .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); - aggregatedTable.toStream().foreach((windowedKey, count) -> { - long windowEnd = windowedKey.window().end(); - - // Simulate window close action - if (System.currentTimeMillis() > windowEnd + Duration.ofMinutes(1).toMillis()) { - System.out.println("Window closed for key: " + windowedKey.key() + " with count: " + count); - } else { - System.out.println("Window still open for key: " + windowedKey.key() + " with count: " + count); - } - }); + aggregatedTable + .toStream() + .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) + .to(OUTPUT); Topology topology = builder.build(); log.info("Generated topology: {}", topology.describe());