WIP
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 13:02:45 +0000 (15:02 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 13:41:32 +0000 (15:41 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 2db690c..f92a65c 100644 (file)
@@ -24,10 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class AggregationTopologyTest
 {
   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
-  static final Duration ADVANCE = Duration.ofSeconds(3);
-  static final TimeWindows WINDOWS = TimeWindows
-      .ofSizeWithNoGrace(WINDOW_SIZE)
-      .advanceBy(ADVANCE);
+  static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
 
 
   @Test
@@ -69,80 +66,76 @@ public class AggregationTopologyTest
 
     sendAt("A", 63);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(54), "A"),
-        KeyValue.pair(windowFor(57), "A"),
-        KeyValue.pair(windowFor(60), "A"),
-        KeyValue.pair(windowFor(63), "A"));
+        KeyValue.pair(windowFor(53000), "A"));
 
     sendAt("B", 64);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(57), "A-B"),
-        KeyValue.pair(windowFor(60), "A-B"),
-        KeyValue.pair(windowFor(63), "A-B"));
+        KeyValue.pair(windowFor(63001), "B"),
+        KeyValue.pair(windowFor(54000), "A-B"));
 
     sendAt("C", 65);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(57), "A-B-C"),
-        KeyValue.pair(windowFor(60), "A-B-C"),
-        KeyValue.pair(windowFor(63), "A-B-C"));
+        KeyValue.pair(windowFor(63001), "B-C"),
+        KeyValue.pair(windowFor(64001), "C"),
+        KeyValue.pair(windowFor(55000), "A-B-C"));
 
     sendAt("D", 66);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(57), "A-B-C-D"),
-        KeyValue.pair(windowFor(60), "A-B-C-D"),
-        KeyValue.pair(windowFor(63), "A-B-C-D"),
-        KeyValue.pair(windowFor(66), "D"));
+        KeyValue.pair(windowFor(64001), "C-D"),
+        KeyValue.pair(windowFor(63001), "B-C-D"),
+        KeyValue.pair(windowFor(65001), "D"),
+        KeyValue.pair(windowFor(56000), "A-B-C-D"));
 
     sendAt("E", 69);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(60), "A-B-C-D-E"),
-        KeyValue.pair(windowFor(63), "A-B-C-D-E"),
-        KeyValue.pair(windowFor(66), "D-E"),
-        KeyValue.pair(windowFor(69), "E"));
+        KeyValue.pair(windowFor(65001), "D-E"),
+        KeyValue.pair(windowFor(64001), "C-D-E"),
+        KeyValue.pair(windowFor(63001), "B-C-D-E"),
+        KeyValue.pair(windowFor(66001), "E"),
+        KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
 
     sendAt("F", 70);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63), "A-B-C-D-E-F"),
-        KeyValue.pair(windowFor(66), "D-E-F"),
-        KeyValue.pair(windowFor(69), "E-F"));
+        KeyValue.pair(windowFor(66001), "E-F"),
+        KeyValue.pair(windowFor(65001), "D-E-F"),
+        KeyValue.pair(windowFor(64001), "C-D-E-F"),
+        KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
+        KeyValue.pair(windowFor(69001), "F"),
+        KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
 
     sendAt("G", 74);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(66), "D-E-F-G"),
-        KeyValue.pair(windowFor(69), "E-F-G"),
-        KeyValue.pair(windowFor(72), "G"));
+        KeyValue.pair(windowFor(69001), "F-G"),
+        KeyValue.pair(windowFor(66001), "E-F-G"),
+        KeyValue.pair(windowFor(65001), "D-E-F-G"),
+        KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
+        KeyValue.pair(windowFor(70001), "G"),
+        KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
 
     sendAt("H", 75);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(66), "D-E-F-G-H"),
-        KeyValue.pair(windowFor(69), "E-F-G-H"),
-        KeyValue.pair(windowFor(72), "G-H"),
-        KeyValue.pair(windowFor(75), "H"));
+        KeyValue.pair(windowFor(70001), "G-H"),
+        KeyValue.pair(windowFor(69001), "F-G-H"),
+        KeyValue.pair(windowFor(66001), "E-F-G-H"),
+        KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
+        KeyValue.pair(windowFor(74001), "H"),
+        KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
 
     sendAt("I", 100);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(93), "I"),
-        KeyValue.pair(windowFor(96), "I"),
-        KeyValue.pair(windowFor(99), "I"));
+        KeyValue.pair(windowFor(90000), "I"));
 
     sendAt("J", 120);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(111), "J"),
-        KeyValue.pair(windowFor(114), "J"),
-        KeyValue.pair(windowFor(117), "J"),
-        KeyValue.pair(windowFor(120), "J"));
+        KeyValue.pair(windowFor(110000), "J"));
 
     sendAt("K", 140);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(132), "K"),
-        KeyValue.pair(windowFor(135), "K"),
-        KeyValue.pair(windowFor(138), "K"));
+        KeyValue.pair(windowFor(130000), "K"));
 
     sendAt("L", 160);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(153), "L"),
-        KeyValue.pair(windowFor(156), "L"),
-        KeyValue.pair(windowFor(159), "L"));
+        KeyValue.pair(windowFor(150000), "L"));
   }
 
 
@@ -207,9 +200,9 @@ public class AggregationTopologyTest
     return new Windowed<>(key, window);
   }
 
-  Windowed<String> windowFor(int second)
+  Windowed<String> windowFor(long milli)
   {
-    Instant startTime = Instant.ofEpochSecond(second);
+    Instant startTime = Instant.ofEpochMilli(milli);
     Instant endTime = startTime.plus(WINDOW_SIZE);
     TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
     return new Windowed<>(KEY, window);