Example for an aggregation with a Session Window
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 19:09:24 +0000 (21:09 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:58:06 +0000 (12:58 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 8233a70..80e0f18 100644 (file)
@@ -4,10 +4,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
@@ -24,8 +24,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Slf4j
 public class AggregationTopologyTest
 {
-  static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
-  static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
+  static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
+  static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
 
 
   @Test
@@ -63,76 +63,56 @@ public class AggregationTopologyTest
 
     sendAt("A", 63);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(53000), "A"));
+        KeyValue.pair(windowFor(63, 63), "A"));
 
     sendAt("B", 64);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63001), "B"),
-        KeyValue.pair(windowFor(54000), "A-B"));
+        KeyValue.pair(windowFor(63, 63), null),
+        KeyValue.pair(windowFor(63, 64), "A-B"));
 
     sendAt("C", 65);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63001), "B-C"),
-        KeyValue.pair(windowFor(64001), "C"),
-        KeyValue.pair(windowFor(55000), "A-B-C"));
+        KeyValue.pair(windowFor(63, 64), null),
+        KeyValue.pair(windowFor(63, 65), "A-B-C"));
 
     sendAt("D", 66);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(64001), "C-D"),
-        KeyValue.pair(windowFor(63001), "B-C-D"),
-        KeyValue.pair(windowFor(65001), "D"),
-        KeyValue.pair(windowFor(56000), "A-B-C-D"));
+        KeyValue.pair(windowFor(63, 65), null),
+        KeyValue.pair(windowFor(63, 66), "A-B-C-D"));
 
     sendAt("E", 69);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(65001), "D-E"),
-        KeyValue.pair(windowFor(64001), "C-D-E"),
-        KeyValue.pair(windowFor(63001), "B-C-D-E"),
-        KeyValue.pair(windowFor(66001), "E"),
-        KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
+        KeyValue.pair(windowFor(69, 69), "E"));
 
     sendAt("F", 70);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(66001), "E-F"),
-        KeyValue.pair(windowFor(65001), "D-E-F"),
-        KeyValue.pair(windowFor(64001), "C-D-E-F"),
-        KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
-        KeyValue.pair(windowFor(69001), "F"),
-        KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
+        KeyValue.pair(windowFor(69, 69), null),
+        KeyValue.pair(windowFor(69, 70), "E-F"));
 
     sendAt("G", 74);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(69001), "F-G"),
-        KeyValue.pair(windowFor(66001), "E-F-G"),
-        KeyValue.pair(windowFor(65001), "D-E-F-G"),
-        KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
-        KeyValue.pair(windowFor(70001), "G"),
-        KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
+        KeyValue.pair(windowFor(74, 74), "G"));
 
     sendAt("H", 75);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(70001), "G-H"),
-        KeyValue.pair(windowFor(69001), "F-G-H"),
-        KeyValue.pair(windowFor(66001), "E-F-G-H"),
-        KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
-        KeyValue.pair(windowFor(74001), "H"),
-        KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
+        KeyValue.pair(windowFor(74, 74), null),
+        KeyValue.pair(windowFor(74, 75), "G-H"));
 
     sendAt("I", 100);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(90000), "I"));
+        KeyValue.pair(windowFor(100, 100), "I"));
 
     sendAt("J", 120);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(110000), "J"));
+        KeyValue.pair(windowFor(120, 120), "J"));
 
     sendAt("K", 140);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(130000), "K"));
+        KeyValue.pair(windowFor(140, 140), "K"));
 
     sendAt("L", 160);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(150000), "L"));
+        KeyValue.pair(windowFor(160, 160), "L"));
   }
 
 
@@ -191,16 +171,16 @@ public class AggregationTopologyTest
     String start = matcher.group(2);
     String end = matcher.group(3);
 
-    Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
+    Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end));
 
     return new Windowed<>(key, window);
   }
 
-  Windowed<String> windowFor(long milli)
+  Windowed<String> windowFor(int startSecond, int endSecond)
   {
-    Instant startTime = Instant.ofEpochMilli(milli);
-    Instant endTime = startTime.plus(WINDOW_SIZE);
-    TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
+    Instant startTime = Instant.ofEpochSecond(startSecond);
+    Instant endTime = Instant.ofEpochSecond(endSecond);
+    SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
     return new Windowed<>(KEY, window);
   }
 }