From: Kai Moritz Date: Mon, 8 Jul 2024 13:02:45 +0000 (+0200) Subject: Remodeled example into a demonstration of an aggregation with a Sliding Window X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=76db4f1f0079532f287b732f27e55620098d86dc;p=demos%2Fkafka%2Fwordcount Remodeled example into a demonstration of an aggregation with a Sliding Window --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 36715bd..8233a70 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -25,10 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class AggregationTopologyTest { static final Duration WINDOW_SIZE = Duration.ofSeconds(10); - static final Duration ADVANCE = Duration.ofSeconds(3); - static final TimeWindows WINDOWS = TimeWindows - .ofSizeWithNoGrace(WINDOW_SIZE) - .advanceBy(ADVANCE); + static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE); @Test @@ -66,80 +63,76 @@ public class AggregationTopologyTest sendAt("A", 63); assertThatOutcomeIs( - KeyValue.pair(windowFor(54), "A"), - KeyValue.pair(windowFor(57), "A"), - KeyValue.pair(windowFor(60), "A"), - KeyValue.pair(windowFor(63), "A")); + KeyValue.pair(windowFor(53000), "A")); sendAt("B", 64); assertThatOutcomeIs( - KeyValue.pair(windowFor(57), "A-B"), - KeyValue.pair(windowFor(60), "A-B"), - KeyValue.pair(windowFor(63), "A-B")); + KeyValue.pair(windowFor(63001), "B"), + KeyValue.pair(windowFor(54000), "A-B")); sendAt("C", 65); assertThatOutcomeIs( - KeyValue.pair(windowFor(57), "A-B-C"), - KeyValue.pair(windowFor(60), "A-B-C"), - KeyValue.pair(windowFor(63), "A-B-C")); + KeyValue.pair(windowFor(63001), "B-C"), + KeyValue.pair(windowFor(64001), "C"), + KeyValue.pair(windowFor(55000), "A-B-C")); sendAt("D", 66); assertThatOutcomeIs( - KeyValue.pair(windowFor(57), "A-B-C-D"), - KeyValue.pair(windowFor(60), "A-B-C-D"), - KeyValue.pair(windowFor(63), "A-B-C-D"), - KeyValue.pair(windowFor(66), "D")); + KeyValue.pair(windowFor(64001), "C-D"), + KeyValue.pair(windowFor(63001), "B-C-D"), + KeyValue.pair(windowFor(65001), "D"), + KeyValue.pair(windowFor(56000), "A-B-C-D")); sendAt("E", 69); assertThatOutcomeIs( - KeyValue.pair(windowFor(60), "A-B-C-D-E"), - KeyValue.pair(windowFor(63), "A-B-C-D-E"), - KeyValue.pair(windowFor(66), "D-E"), - KeyValue.pair(windowFor(69), "E")); + KeyValue.pair(windowFor(65001), "D-E"), + KeyValue.pair(windowFor(64001), "C-D-E"), + KeyValue.pair(windowFor(63001), "B-C-D-E"), + KeyValue.pair(windowFor(66001), "E"), + KeyValue.pair(windowFor(59000), "A-B-C-D-E")); sendAt("F", 70); assertThatOutcomeIs( - KeyValue.pair(windowFor(63), "A-B-C-D-E-F"), - KeyValue.pair(windowFor(66), "D-E-F"), - KeyValue.pair(windowFor(69), "E-F")); + KeyValue.pair(windowFor(66001), "E-F"), + KeyValue.pair(windowFor(65001), "D-E-F"), + KeyValue.pair(windowFor(64001), "C-D-E-F"), + KeyValue.pair(windowFor(63001), "B-C-D-E-F"), + KeyValue.pair(windowFor(69001), "F"), + KeyValue.pair(windowFor(60000), "A-B-C-D-E-F")); sendAt("G", 74); assertThatOutcomeIs( - KeyValue.pair(windowFor(66), "D-E-F-G"), - KeyValue.pair(windowFor(69), "E-F-G"), - KeyValue.pair(windowFor(72), "G")); + KeyValue.pair(windowFor(69001), "F-G"), + KeyValue.pair(windowFor(66001), "E-F-G"), + KeyValue.pair(windowFor(65001), "D-E-F-G"), + KeyValue.pair(windowFor(64001), "C-D-E-F-G"), + KeyValue.pair(windowFor(70001), "G"), + KeyValue.pair(windowFor(64000), "B-C-D-E-F-G")); sendAt("H", 75); assertThatOutcomeIs( - KeyValue.pair(windowFor(66), "D-E-F-G-H"), - KeyValue.pair(windowFor(69), "E-F-G-H"), - KeyValue.pair(windowFor(72), "G-H"), - KeyValue.pair(windowFor(75), "H")); + KeyValue.pair(windowFor(70001), "G-H"), + KeyValue.pair(windowFor(69001), "F-G-H"), + KeyValue.pair(windowFor(66001), "E-F-G-H"), + KeyValue.pair(windowFor(65001), "D-E-F-G-H"), + KeyValue.pair(windowFor(74001), "H"), + KeyValue.pair(windowFor(65000), "C-D-E-F-G-H")); sendAt("I", 100); assertThatOutcomeIs( - KeyValue.pair(windowFor(93), "I"), - KeyValue.pair(windowFor(96), "I"), - KeyValue.pair(windowFor(99), "I")); + KeyValue.pair(windowFor(90000), "I")); sendAt("J", 120); assertThatOutcomeIs( - KeyValue.pair(windowFor(111), "J"), - KeyValue.pair(windowFor(114), "J"), - KeyValue.pair(windowFor(117), "J"), - KeyValue.pair(windowFor(120), "J")); + KeyValue.pair(windowFor(110000), "J")); sendAt("K", 140); assertThatOutcomeIs( - KeyValue.pair(windowFor(132), "K"), - KeyValue.pair(windowFor(135), "K"), - KeyValue.pair(windowFor(138), "K")); + KeyValue.pair(windowFor(130000), "K")); sendAt("L", 160); assertThatOutcomeIs( - KeyValue.pair(windowFor(153), "L"), - KeyValue.pair(windowFor(156), "L"), - KeyValue.pair(windowFor(159), "L")); + KeyValue.pair(windowFor(150000), "L")); } @@ -203,9 +196,9 @@ public class AggregationTopologyTest return new Windowed<>(key, window); } - Windowed windowFor(int second) + Windowed windowFor(long milli) { - Instant startTime = Instant.ofEpochSecond(second); + Instant startTime = Instant.ofEpochMilli(milli); Instant endTime = startTime.plus(WINDOW_SIZE); TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli()); return new Windowed<>(KEY, window);