Vorschlag:2-mehr-an-Ausgangslage-angepasst
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 14:27:33 +0000 (16:27 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 14:27:33 +0000 (16:27 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 90584c2..1b2b03d 100644 (file)
@@ -39,7 +39,9 @@ public class AggregationTopologyTest
     // Define sliding window of size 5 minutes with a grace period of 1 minute
     SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1));
 
-    KGroupedStream<String, String> groupedStream = input.groupByKey();
+    KGroupedStream<String, String> groupedStream = input
+        .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
+        .groupByKey();
 
     KTable<Windowed<String>, Long> aggregatedTable = groupedStream
         .windowedBy(slidingWindow)
@@ -47,16 +49,10 @@ public class AggregationTopologyTest
             .withKeySerde(Serdes.String())
             .withValueSerde(Serdes.Long()));
 
-    aggregatedTable.toStream().foreach((windowedKey, count) -> {
-      long windowEnd = windowedKey.window().end();
-
-      // Simulate window close action
-      if (System.currentTimeMillis() > windowEnd + Duration.ofMinutes(1).toMillis()) {
-        System.out.println("Window closed for key: " + windowedKey.key() + " with count: " + count);
-      } else {
-        System.out.println("Window still open for key: " + windowedKey.key() + " with count: " + count);
-      }
-    });
+    aggregatedTable
+        .toStream()
+        .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
+        .to(OUTPUT);
 
     Topology topology = builder.build();
     log.info("Generated topology: {}", topology.describe());