From ae91a6dcf04a91290021bd6f7035bbfc8d979c30 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 8 Jul 2024 13:33:33 +0200 Subject: [PATCH] Remodeled example into a demonstration of an aggregation with a tumbling time window --- .../counter/AggregationTopologyTest.java | 70 +++++++++++++++---- 1 file changed, 57 insertions(+), 13 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 024f8e4..2db690c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -24,7 +24,10 @@ import static org.assertj.core.api.Assertions.assertThat; public class AggregationTopologyTest { static final Duration WINDOW_SIZE = Duration.ofSeconds(10); - static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE); + static final Duration ADVANCE = Duration.ofSeconds(3); + static final TimeWindows WINDOWS = TimeWindows + .ofSizeWithNoGrace(WINDOW_SIZE) + .advanceBy(ADVANCE); @Test @@ -65,40 +68,81 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(54), "A"), + KeyValue.pair(windowFor(57), "A"), + KeyValue.pair(windowFor(60), "A"), + KeyValue.pair(windowFor(63), "A")); sendAt("B", 64); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(57), "A-B"), + KeyValue.pair(windowFor(60), "A-B"), + KeyValue.pair(windowFor(63), "A-B")); sendAt("C", 65); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(57), "A-B-C"), + KeyValue.pair(windowFor(60), "A-B-C"), + KeyValue.pair(windowFor(63), "A-B-C")); sendAt("D", 66); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(57), "A-B-C-D"), + KeyValue.pair(windowFor(60), "A-B-C-D"), + KeyValue.pair(windowFor(63), "A-B-C-D"), + KeyValue.pair(windowFor(66), "D")); sendAt("E", 69); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(60), "A-B-C-D-E"), + KeyValue.pair(windowFor(63), "A-B-C-D-E"), + KeyValue.pair(windowFor(66), "D-E"), + KeyValue.pair(windowFor(69), "E")); sendAt("F", 70); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(63), "A-B-C-D-E-F"), + KeyValue.pair(windowFor(66), "D-E-F"), + KeyValue.pair(windowFor(69), "E-F")); sendAt("G", 74); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(66), "D-E-F-G"), + KeyValue.pair(windowFor(69), "E-F-G"), + KeyValue.pair(windowFor(72), "G")); sendAt("H", 75); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(66), "D-E-F-G-H"), + KeyValue.pair(windowFor(69), "E-F-G-H"), + KeyValue.pair(windowFor(72), "G-H"), + KeyValue.pair(windowFor(75), "H")); sendAt("I", 100); - assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(93), "I"), + KeyValue.pair(windowFor(96), "I"), + KeyValue.pair(windowFor(99), "I")); sendAt("J", 120); - assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(111), "J"), + KeyValue.pair(windowFor(114), "J"), + KeyValue.pair(windowFor(117), "J"), + KeyValue.pair(windowFor(120), "J")); sendAt("K", 140); - assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(132), "K"), + KeyValue.pair(windowFor(135), "K"), + KeyValue.pair(windowFor(138), "K")); sendAt("L", 160); - assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L")); + assertThatOutcomeIs( + KeyValue.pair(windowFor(153), "L"), + KeyValue.pair(windowFor(156), "L"), + KeyValue.pair(windowFor(159), "L")); } -- 2.20.1