From: Kai Moritz Date: Mon, 8 Jul 2024 19:09:24 +0000 (+0200) Subject: Example for an aggregation with a Session Window X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ff58c72d0480d65b1b73a35fcb2f41149ca76082;p=demos%2Fkafka%2Fwordcount Example for an aggregation with a Session Window --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 8233a70..80e0f18 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -4,10 +4,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -24,8 +24,8 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class AggregationTopologyTest { - static final Duration WINDOW_SIZE = Duration.ofSeconds(10); - static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE); + static final Duration INACTIVITY_GAP = Duration.ofSeconds(2); + static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP); @Test @@ -63,76 +63,56 @@ public class AggregationTopologyTest sendAt("A", 63); assertThatOutcomeIs( - KeyValue.pair(windowFor(53000), "A")); + KeyValue.pair(windowFor(63, 63), "A")); sendAt("B", 64); assertThatOutcomeIs( - KeyValue.pair(windowFor(63001), "B"), - KeyValue.pair(windowFor(54000), "A-B")); + KeyValue.pair(windowFor(63, 63), null), + KeyValue.pair(windowFor(63, 64), "A-B")); sendAt("C", 65); assertThatOutcomeIs( - KeyValue.pair(windowFor(63001), "B-C"), - KeyValue.pair(windowFor(64001), "C"), - KeyValue.pair(windowFor(55000), "A-B-C")); + KeyValue.pair(windowFor(63, 64), null), + KeyValue.pair(windowFor(63, 65), "A-B-C")); sendAt("D", 66); assertThatOutcomeIs( - KeyValue.pair(windowFor(64001), "C-D"), - KeyValue.pair(windowFor(63001), "B-C-D"), - KeyValue.pair(windowFor(65001), "D"), - KeyValue.pair(windowFor(56000), "A-B-C-D")); + KeyValue.pair(windowFor(63, 65), null), + KeyValue.pair(windowFor(63, 66), "A-B-C-D")); sendAt("E", 69); assertThatOutcomeIs( - KeyValue.pair(windowFor(65001), "D-E"), - KeyValue.pair(windowFor(64001), "C-D-E"), - KeyValue.pair(windowFor(63001), "B-C-D-E"), - KeyValue.pair(windowFor(66001), "E"), - KeyValue.pair(windowFor(59000), "A-B-C-D-E")); + KeyValue.pair(windowFor(69, 69), "E")); sendAt("F", 70); assertThatOutcomeIs( - KeyValue.pair(windowFor(66001), "E-F"), - KeyValue.pair(windowFor(65001), "D-E-F"), - KeyValue.pair(windowFor(64001), "C-D-E-F"), - KeyValue.pair(windowFor(63001), "B-C-D-E-F"), - KeyValue.pair(windowFor(69001), "F"), - KeyValue.pair(windowFor(60000), "A-B-C-D-E-F")); + KeyValue.pair(windowFor(69, 69), null), + KeyValue.pair(windowFor(69, 70), "E-F")); sendAt("G", 74); assertThatOutcomeIs( - KeyValue.pair(windowFor(69001), "F-G"), - KeyValue.pair(windowFor(66001), "E-F-G"), - KeyValue.pair(windowFor(65001), "D-E-F-G"), - KeyValue.pair(windowFor(64001), "C-D-E-F-G"), - KeyValue.pair(windowFor(70001), "G"), - KeyValue.pair(windowFor(64000), "B-C-D-E-F-G")); + KeyValue.pair(windowFor(74, 74), "G")); sendAt("H", 75); assertThatOutcomeIs( - KeyValue.pair(windowFor(70001), "G-H"), - KeyValue.pair(windowFor(69001), "F-G-H"), - KeyValue.pair(windowFor(66001), "E-F-G-H"), - KeyValue.pair(windowFor(65001), "D-E-F-G-H"), - KeyValue.pair(windowFor(74001), "H"), - KeyValue.pair(windowFor(65000), "C-D-E-F-G-H")); + KeyValue.pair(windowFor(74, 74), null), + KeyValue.pair(windowFor(74, 75), "G-H")); sendAt("I", 100); assertThatOutcomeIs( - KeyValue.pair(windowFor(90000), "I")); + KeyValue.pair(windowFor(100, 100), "I")); sendAt("J", 120); assertThatOutcomeIs( - KeyValue.pair(windowFor(110000), "J")); + KeyValue.pair(windowFor(120, 120), "J")); sendAt("K", 140); assertThatOutcomeIs( - KeyValue.pair(windowFor(130000), "K")); + KeyValue.pair(windowFor(140, 140), "K")); sendAt("L", 160); assertThatOutcomeIs( - KeyValue.pair(windowFor(150000), "L")); + KeyValue.pair(windowFor(160, 160), "L")); } @@ -191,16 +171,16 @@ public class AggregationTopologyTest String start = matcher.group(2); String end = matcher.group(3); - Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end)); + Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end)); return new Windowed<>(key, window); } - Windowed windowFor(long milli) + Windowed windowFor(int startSecond, int endSecond) { - Instant startTime = Instant.ofEpochMilli(milli); - Instant endTime = startTime.plus(WINDOW_SIZE); - TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli()); + Instant startTime = Instant.ofEpochSecond(startSecond); + Instant endTime = Instant.ofEpochSecond(endSecond); + SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli()); return new Windowed<>(KEY, window); } }