From 5217b2ed5c8aabe11c805fb48a755121b4183737 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 8 Jul 2024 16:33:34 +0200 Subject: [PATCH] Activated supression of intermediate results and adapted expected result --- .../counter/AggregationTopologyTest.java | 70 +++++++------------ 1 file changed, 26 insertions(+), 44 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 36715bd..65eeeab 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,12 +2,11 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -41,7 +40,12 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce((aggregate, value) -> aggregate + "-" + value) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.>as("aggregated-store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -65,81 +69,59 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs( - KeyValue.pair(windowFor(54), "A"), - KeyValue.pair(windowFor(57), "A"), - KeyValue.pair(windowFor(60), "A"), - KeyValue.pair(windowFor(63), "A")); + assertThatOutcomeIs(); sendAt("B", 64); - assertThatOutcomeIs( - KeyValue.pair(windowFor(57), "A-B"), - KeyValue.pair(windowFor(60), "A-B"), - KeyValue.pair(windowFor(63), "A-B")); + assertThatOutcomeIs(KeyValue.pair(windowFor(54), "A")); sendAt("C", 65); - assertThatOutcomeIs( - KeyValue.pair(windowFor(57), "A-B-C"), - KeyValue.pair(windowFor(60), "A-B-C"), - KeyValue.pair(windowFor(63), "A-B-C")); + assertThatOutcomeIs(); sendAt("D", 66); - assertThatOutcomeIs( - KeyValue.pair(windowFor(57), "A-B-C-D"), - KeyValue.pair(windowFor(60), "A-B-C-D"), - KeyValue.pair(windowFor(63), "A-B-C-D"), - KeyValue.pair(windowFor(66), "D")); + assertThatOutcomeIs(); sendAt("E", 69); - assertThatOutcomeIs( - KeyValue.pair(windowFor(60), "A-B-C-D-E"), - KeyValue.pair(windowFor(63), "A-B-C-D-E"), - KeyValue.pair(windowFor(66), "D-E"), - KeyValue.pair(windowFor(69), "E")); + assertThatOutcomeIs(KeyValue.pair(windowFor(57), "A-B-C-D")); sendAt("F", 70); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63), "A-B-C-D-E-F"), - KeyValue.pair(windowFor(66), "D-E-F"), - KeyValue.pair(windowFor(69), "E-F")); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); sendAt("G", 74); - assertThatOutcomeIs( - KeyValue.pair(windowFor(66), "D-E-F-G"), - KeyValue.pair(windowFor(69), "E-F-G"), - KeyValue.pair(windowFor(72), "G")); + assertThatOutcomeIs(KeyValue.pair(windowFor(63), "A-B-C-D-E-F")); sendAt("H", 75); + assertThatOutcomeIs(); + + sendAt("I", 100); assertThatOutcomeIs( KeyValue.pair(windowFor(66), "D-E-F-G-H"), KeyValue.pair(windowFor(69), "E-F-G-H"), KeyValue.pair(windowFor(72), "G-H"), KeyValue.pair(windowFor(75), "H")); - sendAt("I", 100); + sendAt("J", 120); assertThatOutcomeIs( KeyValue.pair(windowFor(93), "I"), KeyValue.pair(windowFor(96), "I"), KeyValue.pair(windowFor(99), "I")); - sendAt("J", 120); + sendAt("K", 140); assertThatOutcomeIs( KeyValue.pair(windowFor(111), "J"), KeyValue.pair(windowFor(114), "J"), KeyValue.pair(windowFor(117), "J"), KeyValue.pair(windowFor(120), "J")); - sendAt("K", 140); + sendAt("L", 160); assertThatOutcomeIs( KeyValue.pair(windowFor(132), "K"), KeyValue.pair(windowFor(135), "K"), KeyValue.pair(windowFor(138), "K")); - sendAt("L", 160); - assertThatOutcomeIs( - KeyValue.pair(windowFor(153), "L"), - KeyValue.pair(windowFor(156), "L"), - KeyValue.pair(windowFor(159), "L")); + // Never received, if no newer message is send + // KeyValue.pair(windowFor(153), "L") + // KeyValue.pair(windowFor(156), "L") + //KeyValue.pair(windowFor(159), "L") } -- 2.20.1