Remodeled example into a demonstration of an aggregation with a tumbling time window
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 11:33:33 +0000 (13:33 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 09:54:11 +0000 (11:54 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 85fb265..36715bd 100644 (file)
@@ -25,7 +25,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class AggregationTopologyTest
 {
   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
-  static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE);
+  static final Duration ADVANCE = Duration.ofSeconds(3);
+  static final TimeWindows WINDOWS = TimeWindows
+      .ofSizeWithNoGrace(WINDOW_SIZE)
+      .advanceBy(ADVANCE);
 
 
   @Test
@@ -62,40 +65,81 @@ public class AggregationTopologyTest
 
 
     sendAt("A", 63);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(54), "A"),
+        KeyValue.pair(windowFor(57), "A"),
+        KeyValue.pair(windowFor(60), "A"),
+        KeyValue.pair(windowFor(63), "A"));
 
     sendAt("B", 64);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(57), "A-B"),
+        KeyValue.pair(windowFor(60), "A-B"),
+        KeyValue.pair(windowFor(63), "A-B"));
 
     sendAt("C", 65);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(57), "A-B-C"),
+        KeyValue.pair(windowFor(60), "A-B-C"),
+        KeyValue.pair(windowFor(63), "A-B-C"));
 
     sendAt("D", 66);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(57), "A-B-C-D"),
+        KeyValue.pair(windowFor(60), "A-B-C-D"),
+        KeyValue.pair(windowFor(63), "A-B-C-D"),
+        KeyValue.pair(windowFor(66), "D"));
 
     sendAt("E", 69);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(60), "A-B-C-D-E"),
+        KeyValue.pair(windowFor(63), "A-B-C-D-E"),
+        KeyValue.pair(windowFor(66), "D-E"),
+        KeyValue.pair(windowFor(69), "E"));
 
     sendAt("F", 70);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(63), "A-B-C-D-E-F"),
+        KeyValue.pair(windowFor(66), "D-E-F"),
+        KeyValue.pair(windowFor(69), "E-F"));
 
     sendAt("G", 74);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(66), "D-E-F-G"),
+        KeyValue.pair(windowFor(69), "E-F-G"),
+        KeyValue.pair(windowFor(72), "G"));
 
     sendAt("H", 75);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(66), "D-E-F-G-H"),
+        KeyValue.pair(windowFor(69), "E-F-G-H"),
+        KeyValue.pair(windowFor(72), "G-H"),
+        KeyValue.pair(windowFor(75), "H"));
 
     sendAt("I", 100);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(93), "I"),
+        KeyValue.pair(windowFor(96), "I"),
+        KeyValue.pair(windowFor(99), "I"));
 
     sendAt("J", 120);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(111), "J"),
+        KeyValue.pair(windowFor(114), "J"),
+        KeyValue.pair(windowFor(117), "J"),
+        KeyValue.pair(windowFor(120), "J"));
 
     sendAt("K", 140);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(132), "K"),
+        KeyValue.pair(windowFor(135), "K"),
+        KeyValue.pair(windowFor(138), "K"));
 
     sendAt("L", 160);
-    assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L"));
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(153), "L"),
+        KeyValue.pair(windowFor(156), "L"),
+        KeyValue.pair(windowFor(159), "L"));
   }